http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java new file mode 100644 index 0000000..7764e67 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedSnapshotBase.java @@ -0,0 +1,555 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.types; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; + +import com.google.common.collect.Maps; +import org.apache.commons.collections.map.DefaultedMap; +import org.apache.fluo.api.client.SnapshotBase; +import org.apache.fluo.api.config.ScannerConfiguration; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.data.RowColumn; +import org.apache.fluo.api.iterator.RowIterator; +import org.apache.fluo.recipes.core.types.TypeLayer.Data; +import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods; +import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods; +import org.apache.fluo.recipes.core.types.TypeLayer.RowMethods; + +// TODO need to refactor column to use Encoder + +/** + * A {@link SnapshotBase} that uses a {@link TypeLayer} + * + * @since 1.0.0 + */ +public class TypedSnapshotBase implements SnapshotBase { + + private SnapshotBase snapshot; + private Encoder encoder; + private TypeLayer tl; + + /** + * @since 1.0.0 + */ + public class VisibilityMethods extends Value { + + VisibilityMethods(Data data) { + super(data); + } + + public Value vis(Bytes cv) { + data.vis = cv; + return new Value(data); + } + + public Value vis(byte[] cv) { + data.vis = Bytes.of(cv); + return new Value(data); + } + + public Value vis(ByteBuffer bb) { + data.vis = Bytes.of(bb); + return new Value(data); + } + + public Value vis(String cv) { + data.vis = Bytes.of(cv); + return new Value(data); + } + } + + /** + * @since 1.0.0 + */ + public class Value { + private Bytes bytes; + private boolean gotBytes = false; + Data data; + + public Bytes getBytes() { + if (!gotBytes) { + try { + bytes = snapshot.get(data.row, data.getCol()); + gotBytes = true; + } catch (Exception e) { + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } + throw new RuntimeException(e); + } + } + + return bytes; + } + + private Value(Bytes bytes) { + this.bytes = bytes; + this.gotBytes = true; + } + + private Value(Data data) { + this.data = data; + this.gotBytes = false; + } + + public Integer toInteger() { + if (getBytes() == null) { + return null; + } + return encoder.decodeInteger(getBytes()); + } + + public int toInteger(int defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return encoder.decodeInteger(getBytes()); + } + + public Long toLong() { + if (getBytes() == null) { + return null; + } + return encoder.decodeLong(getBytes()); + } + + public long toLong(long defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return encoder.decodeLong(getBytes()); + } + + @Override + public String toString() { + if (getBytes() == null) { + return null; + } + return encoder.decodeString(getBytes()); + } + + public String toString(String defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return encoder.decodeString(getBytes()); + } + + public Float toFloat() { + if (getBytes() == null) { + return null; + } + return encoder.decodeFloat(getBytes()); + } + + public float toFloat(float defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return encoder.decodeFloat(getBytes()); + } + + public Double toDouble() { + if (getBytes() == null) { + return null; + } + return encoder.decodeDouble(getBytes()); + } + + public double toDouble(double defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return encoder.decodeDouble(getBytes()); + } + + public Boolean toBoolean() { + if (getBytes() == null) { + return null; + } + return encoder.decodeBoolean(getBytes()); + } + + public boolean toBoolean(boolean defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return encoder.decodeBoolean(getBytes()); + } + + public byte[] toBytes() { + if (getBytes() == null) { + return null; + } + return getBytes().toArray(); + } + + public byte[] toBytes(byte[] defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return getBytes().toArray(); + } + + public ByteBuffer toByteBuffer() { + if (getBytes() == null) { + return null; + } + return ByteBuffer.wrap(getBytes().toArray()); + } + + public ByteBuffer toByteBuffer(ByteBuffer defaultValue) { + if (getBytes() == null) { + return defaultValue; + } + return toByteBuffer(); + } + + @Override + public int hashCode() { + if (getBytes() == null) { + return 0; + } + + return getBytes().hashCode(); + } + + @Override + public boolean equals(Object o) { + if (o instanceof Value) { + Value ov = (Value) o; + if (getBytes() == null) { + return ov.getBytes() == null; + } else { + return getBytes().equals(ov.getBytes()); + } + } + + return false; + } + } + + /** + * @since 1.0.0 + */ + public class ValueQualifierBuilder extends QualifierMethods<VisibilityMethods> { + + ValueQualifierBuilder(Data data) { + tl.super(data); + } + + @Override + VisibilityMethods create(Data data) { + return new VisibilityMethods(data); + } + } + + /** + * @since 1.0.0 + */ + public class ValueFamilyMethods extends FamilyMethods<ValueQualifierBuilder, Value> { + + ValueFamilyMethods(Data data) { + tl.super(data); + } + + @Override + ValueQualifierBuilder create1(Data data) { + return new ValueQualifierBuilder(data); + } + + @Override + Value create2(Data data) { + return new Value(data); + } + + public Map<Column, Value> columns(Set<Column> columns) { + try { + return wrap(snapshot.get(data.row, columns)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public Map<Column, Value> columns(Column... columns) { + try { + return wrap(snapshot.get(data.row, new HashSet<>(Arrays.asList(columns)))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + /** + * @since 1.0.0 + */ + public class MapConverter { + private Collection<Bytes> rows; + private Set<Column> columns; + + public MapConverter(Collection<Bytes> rows, Set<Column> columns) { + this.rows = rows; + this.columns = columns; + } + + private Map<Bytes, Map<Column, Bytes>> getInput() { + try { + return snapshot.get(rows, columns); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private Map wrap2(Map m) { + return Collections.unmodifiableMap(DefaultedMap.decorate(m, new DefaultedMap(new Value( + (Bytes) null)))); + } + + @SuppressWarnings("unchecked") + public Map<String, Map<Column, Value>> toStringMap() { + Map<Bytes, Map<Column, Bytes>> in = getInput(); + Map<String, Map<Column, Value>> out = new HashMap<>(); + + for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) { + out.put(encoder.decodeString(rowEntry.getKey()), wrap(rowEntry.getValue())); + } + + return wrap2(out); + } + + @SuppressWarnings("unchecked") + public Map<Long, Map<Column, Value>> toLongMap() { + Map<Bytes, Map<Column, Bytes>> in = getInput(); + Map<Long, Map<Column, Value>> out = new HashMap<>(); + + for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) { + out.put(encoder.decodeLong(rowEntry.getKey()), wrap(rowEntry.getValue())); + } + + return wrap2(out); + } + + @SuppressWarnings("unchecked") + public Map<Integer, Map<Column, Value>> toIntegerMap() { + Map<Bytes, Map<Column, Bytes>> in = getInput(); + Map<Integer, Map<Column, Value>> out = new HashMap<>(); + + for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) { + out.put(encoder.decodeInteger(rowEntry.getKey()), wrap(rowEntry.getValue())); + } + + return wrap2(out); + } + + @SuppressWarnings("unchecked") + public Map<Bytes, Map<Column, Value>> toBytesMap() { + Map<Bytes, Map<Column, Bytes>> in = getInput(); + Map<Bytes, Map<Column, Value>> out = new HashMap<>(); + + for (Entry<Bytes, Map<Column, Bytes>> rowEntry : in.entrySet()) { + out.put(rowEntry.getKey(), wrap(rowEntry.getValue())); + } + + return wrap2(out); + } + } + + /** + * @since 1.0.0 + */ + public class ColumnsMethods { + private Collection<Bytes> rows; + + public ColumnsMethods(Collection<Bytes> rows) { + this.rows = rows; + } + + public MapConverter columns(Set<Column> columns) { + return new MapConverter(rows, columns); + } + + public MapConverter columns(Column... columns) { + return columns(new HashSet<>(Arrays.asList(columns))); + } + } + + /** + * @since 1.0.0 + */ + public class ValueRowMethods extends RowMethods<ValueFamilyMethods> { + + ValueRowMethods() { + tl.super(); + } + + @Override + ValueFamilyMethods create(Data data) { + return new ValueFamilyMethods(data); + } + + public ColumnsMethods rows(Collection<Bytes> rows) { + return new ColumnsMethods(rows); + } + + public ColumnsMethods rows(Bytes... rows) { + return new ColumnsMethods(Arrays.asList(rows)); + } + + public ColumnsMethods rowsString(String... rows) { + return rowsString(Arrays.asList(rows)); + } + + public ColumnsMethods rowsString(Collection<String> rows) { + ArrayList<Bytes> conv = new ArrayList<>(); + for (String row : rows) { + conv.add(encoder.encode(row)); + } + + return rows(conv); + } + + public ColumnsMethods rowsLong(Long... rows) { + return rowsLong(Arrays.asList(rows)); + } + + public ColumnsMethods rowsLong(Collection<Long> rows) { + ArrayList<Bytes> conv = new ArrayList<>(); + for (Long row : rows) { + conv.add(encoder.encode(row)); + } + + return rows(conv); + } + + public ColumnsMethods rowsInteger(Integer... rows) { + return rowsInteger(Arrays.asList(rows)); + } + + public ColumnsMethods rowsInteger(Collection<Integer> rows) { + ArrayList<Bytes> conv = new ArrayList<>(); + for (Integer row : rows) { + conv.add(encoder.encode(row)); + } + + return rows(conv); + } + + public ColumnsMethods rowsBytes(byte[]... rows) { + return rowsBytes(Arrays.asList(rows)); + } + + public ColumnsMethods rowsBytes(Collection<byte[]> rows) { + ArrayList<Bytes> conv = new ArrayList<>(); + for (byte[] row : rows) { + conv.add(Bytes.of(row)); + } + + return rows(conv); + } + + public ColumnsMethods rowsByteBuffers(ByteBuffer... rows) { + return rowsByteBuffers(Arrays.asList(rows)); + } + + public ColumnsMethods rowsByteBuffers(Collection<ByteBuffer> rows) { + ArrayList<Bytes> conv = new ArrayList<>(); + for (ByteBuffer row : rows) { + conv.add(Bytes.of(row)); + } + + return rows(conv); + } + + } + + TypedSnapshotBase(SnapshotBase snapshot, Encoder encoder, TypeLayer tl) { + this.snapshot = snapshot; + this.encoder = encoder; + this.tl = tl; + } + + @Override + public Bytes get(Bytes row, Column column) { + return snapshot.get(row, column); + } + + @Override + public Map<Column, Bytes> get(Bytes row, Set<Column> columns) { + return snapshot.get(row, columns); + } + + @Override + public Map<Bytes, Map<Column, Bytes>> get(Collection<RowColumn> rowColumns) { + return snapshot.get(rowColumns); + } + + @Override + public RowIterator get(ScannerConfiguration config) { + return snapshot.get(config); + } + + @Override + public Map<Bytes, Map<Column, Bytes>> get(Collection<Bytes> rows, Set<Column> columns) { + return snapshot.get(rows, columns); + } + + public ValueRowMethods get() { + return new ValueRowMethods(); + } + + @SuppressWarnings({"unchecked"}) + private Map<Column, Value> wrap(Map<Column, Bytes> map) { + Map<Column, Value> ret = Maps.transformValues(map, input -> new Value(input)); + return Collections.unmodifiableMap(DefaultedMap.decorate(ret, new Value((Bytes) null))); + } + + @Override + public long getStartTimestamp() { + return snapshot.getStartTimestamp(); + } + + @Override + public String gets(String row, Column column) { + return snapshot.gets(row, column); + } + + @Override + public Map<Column, String> gets(String row, Set<Column> columns) { + return snapshot.gets(row, columns); + } + + @Override + public Map<String, Map<Column, String>> gets(Collection<String> rows, Set<Column> columns) { + return snapshot.gets(rows, columns); + } + + @Override + public Map<String, Map<Column, String>> gets(Collection<RowColumn> rowColumns) { + return snapshot.gets(rowColumns); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java new file mode 100644 index 0000000..17631e0 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransaction.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.types; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.fluo.api.client.Transaction; +import org.apache.fluo.api.exceptions.CommitException; + +/** + * A {@link Transaction} that uses a {@link TypeLayer} + * + * @since 1.0.0 + */ +public class TypedTransaction extends TypedTransactionBase implements Transaction { + + private final Transaction closeTx; + + @VisibleForTesting + protected TypedTransaction(Transaction tx, Encoder encoder, TypeLayer tl) { + super(tx, encoder, tl); + closeTx = tx; + } + + @Override + public void commit() throws CommitException { + closeTx.commit(); + } + + @Override + public void close() { + closeTx.close(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java new file mode 100644 index 0000000..69ec694 --- /dev/null +++ b/modules/core/src/main/java/org/apache/fluo/recipes/core/types/TypedTransactionBase.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.fluo.recipes.core.types; + +import java.nio.ByteBuffer; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.fluo.api.client.TransactionBase; +import org.apache.fluo.api.data.Bytes; +import org.apache.fluo.api.data.Column; +import org.apache.fluo.api.exceptions.AlreadySetException; +import org.apache.fluo.recipes.core.types.TypeLayer.Data; +import org.apache.fluo.recipes.core.types.TypeLayer.FamilyMethods; +import org.apache.fluo.recipes.core.types.TypeLayer.QualifierMethods; +import org.apache.fluo.recipes.core.types.TypeLayer.RowMethods; + +/** + * A {@link TransactionBase} that uses a {@link TypeLayer} + * + * @since 1.0.0 + */ +public class TypedTransactionBase extends TypedSnapshotBase implements TransactionBase { + + private final TransactionBase tx; + private final Encoder encoder; + private final TypeLayer tl; + + /** + * @since 1.0.0 + */ + public class Mutator { + + private boolean set = false; + Data data; + + Mutator(Data data) { + this.data = data; + } + + void checkNotSet() { + if (set) { + throw new IllegalStateException("Already set value"); + } + } + + public void set(Bytes bytes) throws AlreadySetException { + checkNotSet(); + tx.set(data.row, data.getCol(), bytes); + set = true; + } + + public void set(String s) throws AlreadySetException { + set(encoder.encode(s)); + } + + public void set(int i) throws AlreadySetException { + set(encoder.encode(i)); + } + + public void set(long l) throws AlreadySetException { + set(encoder.encode(l)); + } + + public void set(float f) throws AlreadySetException { + set(encoder.encode(f)); + } + + public void set(double d) throws AlreadySetException { + set(encoder.encode(d)); + } + + public void set(boolean b) throws AlreadySetException { + set(encoder.encode(b)); + } + + public void set(byte[] ba) throws AlreadySetException { + set(Bytes.of(ba)); + } + + public void set(ByteBuffer bb) throws AlreadySetException { + set(Bytes.of(bb)); + } + + /** + * Set an empty value + */ + public void set() throws AlreadySetException { + set(Bytes.EMPTY); + } + + /** + * Reads the current value of the row/column, adds i, sets the sum. If the row/column does not + * have a current value, then it defaults to zero. + * + * @param i Integer increment amount + * @throws AlreadySetException if value was previously set in transaction + */ + public void increment(int i) throws AlreadySetException { + checkNotSet(); + Bytes val = tx.get(data.row, data.getCol()); + int v = 0; + if (val != null) { + v = encoder.decodeInteger(val); + } + tx.set(data.row, data.getCol(), encoder.encode(v + i)); + } + + /** + * Reads the current value of the row/column, adds l, sets the sum. If the row/column does not + * have a current value, then it defaults to zero. + * + * @param l Long increment amount + * @throws AlreadySetException if value was previously set in transaction + */ + public void increment(long l) throws AlreadySetException { + checkNotSet(); + Bytes val = tx.get(data.row, data.getCol()); + long v = 0; + if (val != null) { + v = encoder.decodeLong(val); + } + tx.set(data.row, data.getCol(), encoder.encode(v + l)); + } + + public void delete() throws AlreadySetException { + checkNotSet(); + tx.delete(data.row, data.getCol()); + set = true; + } + + public void weaklyNotify() { + checkNotSet(); + tx.setWeakNotification(data.row, data.getCol()); + set = true; + } + + } + + /** + * @since 1.0.0 + */ + public class VisibilityMutator extends Mutator { + + VisibilityMutator(Data data) { + super(data); + } + + public Mutator vis(String cv) { + checkNotSet(); + data.vis = Bytes.of(cv); + return new Mutator(data); + } + + public Mutator vis(Bytes cv) { + checkNotSet(); + data.vis = cv; + return new Mutator(data); + } + + public Mutator vis(byte[] cv) { + checkNotSet(); + data.vis = Bytes.of(cv); + return new Mutator(data); + } + + public Mutator vis(ByteBuffer cv) { + checkNotSet(); + data.vis = Bytes.of(cv); + return new Mutator(data); + } + } + + /** + * @since 1.0.0 + */ + public class MutatorQualifierMethods extends QualifierMethods<VisibilityMutator> { + + MutatorQualifierMethods(Data data) { + tl.super(data); + } + + @Override + VisibilityMutator create(Data data) { + return new VisibilityMutator(data); + } + } + + /** + * @since 1.0.0 + */ + public class MutatorFamilyMethods extends FamilyMethods<MutatorQualifierMethods, Mutator> { + + MutatorFamilyMethods(Data data) { + tl.super(data); + } + + @Override + MutatorQualifierMethods create1(Data data) { + return new MutatorQualifierMethods(data); + } + + @Override + Mutator create2(Data data) { + return new Mutator(data); + } + } + + /** + * @since 1.0.0 + */ + public class MutatorRowMethods extends RowMethods<MutatorFamilyMethods> { + + MutatorRowMethods() { + tl.super(); + } + + @Override + MutatorFamilyMethods create(Data data) { + return new MutatorFamilyMethods(data); + } + + } + + @VisibleForTesting + protected TypedTransactionBase(TransactionBase tx, Encoder encoder, TypeLayer tl) { + super(tx, encoder, tl); + this.tx = tx; + this.encoder = encoder; + this.tl = tl; + } + + public MutatorRowMethods mutate() { + return new MutatorRowMethods(); + } + + @Override + public void set(Bytes row, Column col, Bytes value) throws AlreadySetException { + tx.set(row, col, value); + } + + @Override + public void set(String row, Column col, String value) throws AlreadySetException { + tx.set(row, col, value); + } + + @Override + public void setWeakNotification(Bytes row, Column col) { + tx.setWeakNotification(row, col); + } + + @Override + public void setWeakNotification(String row, Column col) { + tx.setWeakNotification(row, col); + } + + @Override + public void delete(Bytes row, Column col) throws AlreadySetException { + tx.delete(row, col); + } + + @Override + public void delete(String row, Column col) { + tx.delete(row, col); + } +} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java b/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java deleted file mode 100644 index 2501fa1..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/data/RowHasher.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.data; - -import java.util.ArrayList; -import java.util.List; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import com.google.common.base.Strings; -import com.google.common.hash.Hashing; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.BytesBuilder; -import org.apache.fluo.recipes.common.Pirtos; - -/** - * This recipe provides code to help add a hash of the row as a prefix of the row. Using this recipe - * rows are structured like the following. - * - * <p> - * {@code <prefix>:<fixed len row hash>:<user row>} - * - * <p> - * The recipe also provides code the help generate split points and configure balancing of the - * prefix. - * - * <p> - * The project documentation has more information. - */ -public class RowHasher { - - private static final int HASH_LEN = 4; - - public Pirtos getTableOptimizations(int numTablets) { - - List<Bytes> splits = new ArrayList<>(numTablets - 1); - - int numSplits = numTablets - 1; - int distance = (((int) Math.pow(Character.MAX_RADIX, HASH_LEN) - 1) / numTablets) + 1; - int split = distance; - for (int i = 0; i < numSplits; i++) { - splits.add(Bytes.of(prefix - + Strings.padStart(Integer.toString(split, Character.MAX_RADIX), HASH_LEN, '0'))); - split += distance; - } - - splits.add(Bytes.of(prefix + "~")); - - - Pirtos pirtos = new Pirtos(); - pirtos.setSplits(splits); - pirtos.setTabletGroupingRegex(Pattern.quote(prefix.toString())); - - return pirtos; - } - - - private Bytes prefix; - - public RowHasher(String prefix) { - this.prefix = Bytes.of(prefix + ":"); - } - - /** - * @return Returns input with prefix and hash of input prepended. - */ - public Bytes addHash(String row) { - return addHash(Bytes.of(row)); - } - - /** - * @return Returns input with prefix and hash of input prepended. - */ - public Bytes addHash(Bytes row) { - BytesBuilder builder = Bytes.newBuilder(prefix.length() + 5 + row.length()); - builder.append(prefix); - builder.append(genHash(row)); - builder.append(":"); - builder.append(row); - return builder.toBytes(); - } - - private boolean hasHash(Bytes row) { - for (int i = prefix.length(); i < prefix.length() + HASH_LEN; i++) { - byte b = row.byteAt(i); - boolean isAlphaNum = (b >= 'a' && b <= 'z') || (b >= '0' && b <= '9'); - if (!isAlphaNum) { - return false; - } - } - - if (row.byteAt(prefix.length() - 1) != ':' || row.byteAt(prefix.length() + HASH_LEN) != ':') { - return false; - } - - return true; - } - - /** - * @return Returns input with prefix and hash stripped from beginning. - */ - public Bytes removeHash(Bytes row) { - Preconditions.checkArgument(row.length() >= prefix.length() + 5, - "Row is shorter than expected " + row); - Preconditions.checkArgument(row.subSequence(0, prefix.length()).equals(prefix), - "Row does not have expected prefix " + row); - Preconditions.checkArgument(hasHash(row), "Row does not have expected hash " + row); - return row.subSequence(prefix.length() + 5, row.length()); - } - - private static String genHash(Bytes row) { - int hash = Hashing.murmur3_32().hashBytes(row.toArray()).asInt(); - hash = hash & 0x7fffffff; - // base 36 gives a lot more bins in 4 bytes than hex, but it is still human readable which is - // nice for debugging. - String hashString = - Strings.padStart(Integer.toString(hash, Character.MAX_RADIX), HASH_LEN, '0'); - hashString = hashString.substring(hashString.length() - HASH_LEN); - - return hashString; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java deleted file mode 100644 index c477ab1..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/Export.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -import java.util.Objects; - -public class Export<K, V> { - private final K key; - private final V value; - - public Export(K key, V val) { - Objects.requireNonNull(key); - Objects.requireNonNull(val); - this.key = key; - this.value = val; - } - - public K getKey() { - return key; - } - - public V getValue() { - return value; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java deleted file mode 100644 index fa9bb45..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportBucket.java +++ /dev/null @@ -1,203 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -import java.util.Collections; -import java.util.Iterator; -import java.util.Map.Entry; - -import com.google.common.base.Preconditions; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.ScannerConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumn; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; -import org.apache.fluo.recipes.impl.BucketUtil; -import org.apache.fluo.recipes.types.StringEncoder; -import org.apache.fluo.recipes.types.TypeLayer; -import org.apache.fluo.recipes.types.TypedTransactionBase; - -/** - * This class encapsulates a buckets serialization code. - */ -class ExportBucket { - private static final String NOTIFICATION_CF = "fluoRecipes"; - private static final String NOTIFICATION_CQ_PREFIX = "eq:"; - private static final Column EXPORT_COL = new Column("e", "v"); - private static final Column NEXT_COL = new Column("e", "next"); - - static Column newNotificationColumn(String queueId) { - return new Column(NOTIFICATION_CF, NOTIFICATION_CQ_PREFIX + queueId); - } - - private final TypedTransactionBase ttx; - private final String qid; - private final Bytes bucketRow; - - static Bytes generateBucketRow(String qid, int bucket, int numBuckets) { - return Bytes.of(qid + ":" + BucketUtil.genBucketId(bucket, numBuckets)); - } - - ExportBucket(TransactionBase tx, String qid, int bucket, int numBuckets) { - // TODO encode in a more robust way... but for now fail early - Preconditions.checkArgument(!qid.contains(":"), "Export QID can not contain :"); - this.ttx = new TypeLayer(new StringEncoder()).wrap(tx); - this.qid = qid; - this.bucketRow = generateBucketRow(qid, bucket, numBuckets); - } - - ExportBucket(TransactionBase tx, Bytes bucketRow) { - this.ttx = new TypeLayer(new StringEncoder()).wrap(tx); - - int colonLoc = -1; - - for (int i = 0; i < bucketRow.length(); i++) { - if (bucketRow.byteAt(i) == ':') { - colonLoc = i; - break; - } - } - - Preconditions.checkArgument(colonLoc != -1 && colonLoc != bucketRow.length(), - "Invalid bucket row " + bucketRow); - Preconditions.checkArgument(bucketRow.byteAt(bucketRow.length() - 1) == ':', - "Invalid bucket row " + bucketRow); - - this.bucketRow = bucketRow.subSequence(0, bucketRow.length() - 1); - this.qid = bucketRow.subSequence(0, colonLoc).toString(); - } - - private static byte[] encSeq(long l) { - byte[] ret = new byte[8]; - ret[0] = (byte) (l >>> 56); - ret[1] = (byte) (l >>> 48); - ret[2] = (byte) (l >>> 40); - ret[3] = (byte) (l >>> 32); - ret[4] = (byte) (l >>> 24); - ret[5] = (byte) (l >>> 16); - ret[6] = (byte) (l >>> 8); - ret[7] = (byte) (l >>> 0); - return ret; - } - - private static long decodeSeq(Bytes seq) { - return (((long) seq.byteAt(0) << 56) + ((long) (seq.byteAt(1) & 255) << 48) - + ((long) (seq.byteAt(2) & 255) << 40) + ((long) (seq.byteAt(3) & 255) << 32) - + ((long) (seq.byteAt(4) & 255) << 24) + ((seq.byteAt(5) & 255) << 16) - + ((seq.byteAt(6) & 255) << 8) + ((seq.byteAt(7) & 255) << 0)); - } - - - public void add(long seq, byte[] key, byte[] value) { - Bytes row = - Bytes.newBuilder(bucketRow.length() + 1 + key.length + 8).append(bucketRow).append(":") - .append(key).append(encSeq(seq)).toBytes(); - ttx.set(row, EXPORT_COL, Bytes.of(value)); - } - - /** - * Computes the minimial row for a bucket - */ - private Bytes getMinimalRow() { - return Bytes.newBuilder(bucketRow.length() + 1).append(bucketRow).append(":").toBytes(); - } - - public void notifyExportObserver() { - ttx.mutate().row(getMinimalRow()).col(newNotificationColumn(qid)).weaklyNotify(); - } - - public Iterator<ExportEntry> getExportIterator(Bytes continueRow) { - ScannerConfiguration sc = new ScannerConfiguration(); - - if (continueRow != null) { - Span tmpSpan = Span.prefix(bucketRow); - Span nextSpan = - new Span(new RowColumn(continueRow, EXPORT_COL), true, tmpSpan.getEnd(), - tmpSpan.isEndInclusive()); - sc.setSpan(nextSpan); - } else { - sc.setSpan(Span.prefix(bucketRow)); - } - - sc.fetchColumn(EXPORT_COL.getFamily(), EXPORT_COL.getQualifier()); - RowIterator iter = ttx.get(sc); - - if (iter.hasNext()) { - return new ExportIterator(iter); - } else { - return Collections.<ExportEntry>emptySet().iterator(); - } - } - - private class ExportIterator implements Iterator<ExportEntry> { - - private RowIterator rowIter; - private Bytes lastRow; - - public ExportIterator(RowIterator rowIter) { - this.rowIter = rowIter; - } - - @Override - public boolean hasNext() { - return rowIter.hasNext(); - } - - @Override - public ExportEntry next() { - Entry<Bytes, ColumnIterator> rowCol = rowIter.next(); - Bytes row = rowCol.getKey(); - - Bytes keyBytes = row.subSequence(bucketRow.length() + 1, row.length() - 8); - Bytes seqBytes = row.subSequence(row.length() - 8, row.length()); - - ExportEntry ee = new ExportEntry(); - - ee.key = keyBytes.toArray(); - ee.seq = decodeSeq(seqBytes); - // TODO maybe leave as Bytes? - ee.value = rowCol.getValue().next().getValue().toArray(); - - lastRow = row; - - return ee; - } - - @Override - public void remove() { - ttx.mutate().row(lastRow).col(EXPORT_COL).delete(); - } - } - - public Bytes getContinueRow() { - return ttx.get(getMinimalRow(), NEXT_COL); - } - - public void setContinueRow(ExportEntry ee) { - Bytes nextRow = - Bytes.newBuilder(bucketRow.length() + 1 + ee.key.length + 8).append(bucketRow).append(":") - .append(ee.key).append(encSeq(ee.seq)).toBytes(); - - ttx.set(getMinimalRow(), NEXT_COL, nextRow); - } - - public void clearContinueRow() { - ttx.delete(getMinimalRow(), NEXT_COL); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java deleted file mode 100644 index 1b156b9..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportEntry.java +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -class ExportEntry { - byte[] key; - long seq; - byte[] value; -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java deleted file mode 100644 index 972af6e..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportObserver.java +++ /dev/null @@ -1,140 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -import java.util.Iterator; -import java.util.NoSuchElementException; - -import com.google.common.collect.Iterators; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.observer.AbstractObserver; -import org.apache.fluo.recipes.serialization.SimpleSerializer; - -public class ExportObserver<K, V> extends AbstractObserver { - - private static class MemLimitIterator implements Iterator<ExportEntry> { - - private long memConsumed = 0; - private long memLimit; - private int extraPerKey; - private Iterator<ExportEntry> source; - - public MemLimitIterator(Iterator<ExportEntry> input, long limit, int extraPerKey) { - this.source = input; - this.memLimit = limit; - this.extraPerKey = extraPerKey; - } - - @Override - public boolean hasNext() { - return memConsumed < memLimit && source.hasNext(); - } - - @Override - public ExportEntry next() { - if (!hasNext()) { - throw new NoSuchElementException(); - } - ExportEntry ee = source.next(); - memConsumed += ee.key.length + extraPerKey + ee.value.length; - return ee; - } - - @Override - public void remove() { - source.remove(); - } - } - - private String queueId; - private Class<K> keyType; - private Class<V> valType; - SimpleSerializer serializer; - private Exporter<K, V> exporter; - - private long memLimit; - - protected String getQueueId() { - return queueId; - } - - SimpleSerializer getSerializer() { - return serializer; - } - - @SuppressWarnings("unchecked") - @Override - public void init(Context context) throws Exception { - queueId = context.getParameters().get("queueId"); - ExportQueue.Options opts = new ExportQueue.Options(queueId, context.getAppConfiguration()); - - // TODO defer loading classes... so that not done during fluo init - // TODO move class loading to centralized place... also attempt to check type params - keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType); - valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType); - exporter = - getClass().getClassLoader().loadClass(opts.exporterType).asSubclass(Exporter.class) - .newInstance(); - - serializer = SimpleSerializer.getInstance(context.getAppConfiguration()); - - memLimit = opts.getBufferSize(); - - exporter.init(queueId, context); - } - - @Override - public ObservedColumn getObservedColumn() { - return new ObservedColumn(ExportBucket.newNotificationColumn(queueId), NotificationType.WEAK); - } - - @Override - public void process(TransactionBase tx, Bytes row, Column column) throws Exception { - ExportBucket bucket = new ExportBucket(tx, row); - - Bytes continueRow = bucket.getContinueRow(); - - Iterator<ExportEntry> input = bucket.getExportIterator(continueRow); - MemLimitIterator memLimitIter = new MemLimitIterator(input, memLimit, 8 + queueId.length()); - - Iterator<SequencedExport<K, V>> exportIterator = - Iterators.transform( - memLimitIter, - ee -> new SequencedExport<>(serializer.deserialize(ee.key, keyType), serializer - .deserialize(ee.value, valType), ee.seq)); - - exportIterator = Iterators.consumingIterator(exportIterator); - - exporter.processExports(exportIterator); - - if (input.hasNext()) { - // not everything was processed so notify self - bucket.notifyExportObserver(); - - if (!memLimitIter.hasNext()) { - // stopped because of mem limit... set continue key - bucket.setContinueRow(input.next()); - continueRow = null; - } - } - - if (continueRow != null) { - bucket.clearContinueRow(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java deleted file mode 100644 index 13518e7..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/ExportQueue.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Set; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import com.google.common.hash.Hashing; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.recipes.common.Pirtos; -import org.apache.fluo.recipes.common.RowRange; -import org.apache.fluo.recipes.common.TransientRegistry; -import org.apache.fluo.recipes.serialization.SimpleSerializer; - -public class ExportQueue<K, V> { - - private static final String RANGE_BEGIN = "#"; - private static final String RANGE_END = ":~"; - - private int numBuckets; - private SimpleSerializer serializer; - private String queueId; - - // usage hint : could be created once in an observers init method - // usage hint : maybe have a queue for each type of data being exported??? - // maybe less queues are - // more efficient though because more batching at export time?? - ExportQueue(Options opts, SimpleSerializer serializer) throws Exception { - // TODO sanity check key type based on type params - // TODO defer creating classes until needed.. so that its not done during Fluo init - this.queueId = opts.queueId; - this.numBuckets = opts.numBuckets; - this.serializer = serializer; - } - - public void add(TransactionBase tx, K key, V value) { - addAll(tx, Collections.singleton(new Export<>(key, value)).iterator()); - } - - public void addAll(TransactionBase tx, Iterator<Export<K, V>> exports) { - - Set<Integer> bucketsNotified = new HashSet<>(); - while (exports.hasNext()) { - Export<K, V> export = exports.next(); - - byte[] k = serializer.serialize(export.getKey()); - byte[] v = serializer.serialize(export.getValue()); - - int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - int bucketId = Math.abs(hash % numBuckets); - - ExportBucket bucket = new ExportBucket(tx, queueId, bucketId, numBuckets); - bucket.add(tx.getStartTimestamp(), k, v); - - if (!bucketsNotified.contains(bucketId)) { - bucket.notifyExportObserver(); - bucketsNotified.add(bucketId); - } - } - } - - public static <K2, V2> ExportQueue<K2, V2> getInstance(String exportQueueId, - SimpleConfiguration appConfig) { - Options opts = new Options(exportQueueId, appConfig); - try { - return new ExportQueue<>(opts, SimpleSerializer.getInstance(appConfig)); - } catch (Exception e) { - // TODO - throw new RuntimeException(e); - } - } - - /** - * Call this method before initializing Fluo. - * - * @param fluoConfig The configuration that will be used to initialize fluo. - */ - public static void configure(FluoConfiguration fluoConfig, Options opts) { - SimpleConfiguration appConfig = fluoConfig.getAppConfiguration(); - opts.save(appConfig); - - fluoConfig.addObserver(new ObserverConfiguration(ExportObserver.class.getName()) - .setParameters(Collections.singletonMap("queueId", opts.queueId))); - - Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN); - Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END); - - new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("exportQueue." - + opts.queueId, new RowRange(exportRangeStart, exportRangeStop)); - } - - /** - * Return suggested Fluo table optimizations for all previously configured export queues. - * - * @param appConfig Must pass in the application configuration obtained from - * {@code FluoClient.getAppConfiguration()} or - * {@code FluoConfiguration.getAppConfiguration()} - */ - - public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) { - HashSet<String> queueIds = new HashSet<>(); - appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining( - k -> queueIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0])); - - Pirtos pirtos = new Pirtos(); - queueIds.forEach(qid -> pirtos.merge(getTableOptimizations(qid, appConfig))); - - return pirtos; - } - - /** - * Return suggested Fluo table optimizations for the specified export queue. - * - * @param appConfig Must pass in the application configuration obtained from - * {@code FluoClient.getAppConfiguration()} or - * {@code FluoConfiguration.getAppConfiguration()} - */ - public static Pirtos getTableOptimizations(String queueId, SimpleConfiguration appConfig) { - Options opts = new Options(queueId, appConfig); - - List<Bytes> splits = new ArrayList<>(); - - Bytes exportRangeStart = Bytes.of(opts.queueId + RANGE_BEGIN); - Bytes exportRangeStop = Bytes.of(opts.queueId + RANGE_END); - - splits.add(exportRangeStart); - splits.add(exportRangeStop); - - List<Bytes> exportSplits = new ArrayList<>(); - for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) { - exportSplits.add(ExportBucket.generateBucketRow(opts.queueId, i, opts.numBuckets)); - } - Collections.sort(exportSplits); - splits.addAll(exportSplits); - - Pirtos pirtos = new Pirtos(); - pirtos.setSplits(splits); - - // the tablet with end row <queueId># does not contain any data for the export queue and - // should not be grouped with the export queue - pirtos.setTabletGroupingRegex(Pattern.quote(queueId + ":")); - - return pirtos; - } - - public static class Options { - - private static final String PREFIX = "recipes.exportQueue."; - static final long DEFAULT_BUFFER_SIZE = 1 << 20; - static final int DEFAULT_BUCKETS_PER_TABLET = 10; - - int numBuckets; - Integer bucketsPerTablet = null; - Long bufferSize; - - String keyType; - String valueType; - String exporterType; - String queueId; - - Options(String queueId, SimpleConfiguration appConfig) { - this.queueId = queueId; - - this.numBuckets = appConfig.getInt(PREFIX + queueId + ".buckets"); - this.exporterType = appConfig.getString(PREFIX + queueId + ".exporter"); - this.keyType = appConfig.getString(PREFIX + queueId + ".key"); - this.valueType = appConfig.getString(PREFIX + queueId + ".val"); - this.bufferSize = appConfig.getLong(PREFIX + queueId + ".bufferSize", DEFAULT_BUFFER_SIZE); - this.bucketsPerTablet = - appConfig.getInt(PREFIX + queueId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET); - } - - public Options(String queueId, String exporterType, String keyType, String valueType, - int buckets) { - Preconditions.checkArgument(buckets > 0); - - this.queueId = queueId; - this.numBuckets = buckets; - this.exporterType = exporterType; - this.keyType = keyType; - this.valueType = valueType; - } - - - public <K, V> Options(String queueId, Class<? extends Exporter<K, V>> exporter, - Class<K> keyType, Class<V> valueType, int buckets) { - this(queueId, exporter.getName(), keyType.getName(), valueType.getName(), buckets); - } - - /** - * Sets a limit on the amount of serialized updates to read into memory. Additional memory will - * be used to actually deserialize and process the updates. This limit does not account for - * object overhead in java, which can be significant. - * - * <p> - * The way memory read is calculated is by summing the length of serialized key and value byte - * arrays. Once this sum exceeds the configured memory limit, no more export key values are - * processed in the current transaction. When not everything is processed, the observer - * processing exports will notify itself causing another transaction to continue processing - * later. - */ - public Options setBufferSize(long bufferSize) { - Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive"); - this.bufferSize = bufferSize; - return this; - } - - long getBufferSize() { - if (bufferSize == null) { - return DEFAULT_BUFFER_SIZE; - } - - return bufferSize; - } - - /** - * Sets the number of buckets per tablet to generate. This affects how many split points will be - * generated when optimizing the Accumulo table. - * - */ - public Options setBucketsPerTablet(int bucketsPerTablet) { - Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : " - + bucketsPerTablet); - this.bucketsPerTablet = bucketsPerTablet; - return this; - } - - int getBucketsPerTablet() { - if (bucketsPerTablet == null) { - return DEFAULT_BUCKETS_PER_TABLET; - } - - return bucketsPerTablet; - } - - void save(SimpleConfiguration appConfig) { - appConfig.setProperty(PREFIX + queueId + ".buckets", numBuckets + ""); - appConfig.setProperty(PREFIX + queueId + ".exporter", exporterType + ""); - appConfig.setProperty(PREFIX + queueId + ".key", keyType); - appConfig.setProperty(PREFIX + queueId + ".val", valueType); - - if (bufferSize != null) { - appConfig.setProperty(PREFIX + queueId + ".bufferSize", bufferSize); - } - if (bucketsPerTablet != null) { - appConfig.setProperty(PREFIX + queueId + ".bucketsPerTablet", bucketsPerTablet); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java deleted file mode 100644 index b81e9d1..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/Exporter.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -import java.util.Iterator; - -import org.apache.fluo.api.observer.Observer.Context; - -public abstract class Exporter<K, V> { - - public void init(String queueId, Context observerContext) throws Exception {} - - /** - * Must be able to handle same key being exported multiple times and key being exported out of - * order. The sequence number is meant to help with this. - * - * <p> - * If multiple export entries with the same key are passed in, then the entries with the same key - * will be consecutive and in ascending sequence order. - * - * <p> - * If the call to process exports is unexpectedly terminated, it will be called again later with - * at least the same data. For example suppose an exporter was passed the following entries. - * - * <ul> - * <li>key=0 sequence=9 value=abc - * <li>key=1 sequence=13 value=d - * <li>key=1 sequence=17 value=e - * <li>key=1 sequence=23 value=f - * <li>key=2 sequence=19 value=x - * </ul> - * - * <p> - * Assume the exporter exports some of these and then fails before completing all of them. The - * next time its called it will be passed what it saw before, but it could also be passed more. - * - * <ul> - * <li>key=0 sequence=9 value=abc - * <li>key=1 sequence=13 value=d - * <li>key=1 sequence=17 value=e - * <li>key=1 sequence=23 value=f - * <li>key=1 sequence=29 value=g - * <li>key=2 sequence=19 value=x - * <li>key=2 sequence=77 value=y - * </ul> - * - */ - protected abstract void processExports(Iterator<SequencedExport<K, V>> exports); - - // TODO add close -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java b/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java deleted file mode 100644 index a862a8e..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/export/SequencedExport.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.export; - -public class SequencedExport<K, V> extends Export<K, V> { - private final long seq; - - SequencedExport(K k, V v, long seq) { - super(k, v); - this.seq = seq; - } - - public long getSequence() { - return seq; - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java b/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java deleted file mode 100644 index ded289c..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/impl/BucketUtil.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.impl; - -public class BucketUtil { - public static String genBucketId(int bucket, int maxBucket) { - int bucketLen = Integer.toHexString(maxBucket).length(); - // TODO printf is slow - return String.format("%0" + bucketLen + "x", bucket); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo-recipes/blob/beea3f96/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java b/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java deleted file mode 100644 index bc7bffd..0000000 --- a/modules/core/src/main/java/org/apache/fluo/recipes/map/CollisionFreeMap.java +++ /dev/null @@ -1,657 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.recipes.map; - -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.Set; -import java.util.regex.Pattern; - -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.Iterators; -import com.google.common.collect.Sets; -import com.google.common.hash.Hashing; -import org.apache.fluo.api.client.SnapshotBase; -import org.apache.fluo.api.client.TransactionBase; -import org.apache.fluo.api.config.FluoConfiguration; -import org.apache.fluo.api.config.ObserverConfiguration; -import org.apache.fluo.api.config.ScannerConfiguration; -import org.apache.fluo.api.config.SimpleConfiguration; -import org.apache.fluo.api.data.Bytes; -import org.apache.fluo.api.data.BytesBuilder; -import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumn; -import org.apache.fluo.api.data.RowColumnValue; -import org.apache.fluo.api.data.Span; -import org.apache.fluo.api.iterator.ColumnIterator; -import org.apache.fluo.api.iterator.RowIterator; -import org.apache.fluo.recipes.common.Pirtos; -import org.apache.fluo.recipes.common.RowRange; -import org.apache.fluo.recipes.common.TransientRegistry; -import org.apache.fluo.recipes.impl.BucketUtil; -import org.apache.fluo.recipes.serialization.SimpleSerializer; - -/** - * See the project level documentation for information about this recipe. - */ -public class CollisionFreeMap<K, V> { - - private static final String UPDATE_RANGE_END = ":u:~"; - - private static final String DATA_RANGE_END = ":d:~"; - - private String mapId; - - private Class<K> keyType; - private Class<V> valType; - private SimpleSerializer serializer; - private Combiner<K, V> combiner; - UpdateObserver<K, V> updateObserver; - private long bufferSize; - - static final Column UPDATE_COL = new Column("u", "v"); - static final Column NEXT_COL = new Column("u", "next"); - - private int numBuckets = -1; - - @SuppressWarnings("unchecked") - CollisionFreeMap(Options opts, SimpleSerializer serializer) throws Exception { - - this.mapId = opts.mapId; - // TODO defer loading classes - // TODO centralize class loading - // TODO try to check type params - this.numBuckets = opts.numBuckets; - this.keyType = (Class<K>) getClass().getClassLoader().loadClass(opts.keyType); - this.valType = (Class<V>) getClass().getClassLoader().loadClass(opts.valueType); - this.combiner = - (Combiner<K, V>) getClass().getClassLoader().loadClass(opts.combinerType).newInstance(); - this.serializer = serializer; - if (opts.updateObserverType != null) { - this.updateObserver = - getClass().getClassLoader().loadClass(opts.updateObserverType) - .asSubclass(UpdateObserver.class).newInstance(); - } else { - this.updateObserver = new NullUpdateObserver<>(); - } - this.bufferSize = opts.getBufferSize(); - } - - private V deserVal(Bytes val) { - return serializer.deserialize(val.toArray(), valType); - } - - private Bytes getKeyFromUpdateRow(Bytes prefix, Bytes row) { - return row.subSequence(prefix.length(), row.length() - 8); - } - - void process(TransactionBase tx, Bytes ntfyRow, Column col) throws Exception { - - Bytes nextKey = tx.get(ntfyRow, NEXT_COL); - - ScannerConfiguration sc = new ScannerConfiguration(); - - if (nextKey != null) { - Bytes startRow = - Bytes.newBuilder(ntfyRow.length() + nextKey.length()).append(ntfyRow).append(nextKey) - .toBytes(); - Span tmpSpan = Span.prefix(ntfyRow); - Span nextSpan = - new Span(new RowColumn(startRow, UPDATE_COL), false, tmpSpan.getEnd(), - tmpSpan.isEndInclusive()); - sc.setSpan(nextSpan); - } else { - sc.setSpan(Span.prefix(ntfyRow)); - } - - sc.setSpan(Span.prefix(ntfyRow)); - sc.fetchColumn(UPDATE_COL.getFamily(), UPDATE_COL.getQualifier()); - RowIterator iter = tx.get(sc); - - Map<Bytes, List<Bytes>> updates = new HashMap<>(); - - long approxMemUsed = 0; - - Bytes partiallyReadKey = null; - - if (iter.hasNext()) { - Bytes lastKey = null; - while (iter.hasNext() && approxMemUsed < bufferSize) { - Entry<Bytes, ColumnIterator> rowCol = iter.next(); - Bytes curRow = rowCol.getKey(); - - tx.delete(curRow, UPDATE_COL); - - Bytes serializedKey = getKeyFromUpdateRow(ntfyRow, curRow); - lastKey = serializedKey; - - List<Bytes> updateList = updates.get(serializedKey); - if (updateList == null) { - updateList = new ArrayList<>(); - updates.put(serializedKey, updateList); - } - - Bytes val = rowCol.getValue().next().getValue(); - updateList.add(val); - - approxMemUsed += curRow.length(); - approxMemUsed += val.length(); - } - - if (iter.hasNext()) { - Entry<Bytes, ColumnIterator> rowCol = iter.next(); - Bytes curRow = rowCol.getKey(); - - // check if more updates for last key - if (getKeyFromUpdateRow(ntfyRow, curRow).equals(lastKey)) { - // there are still more updates for this key - partiallyReadKey = lastKey; - - // start next time at the current key - tx.set(ntfyRow, NEXT_COL, partiallyReadKey); - } else { - // start next time at the next possible key - Bytes nextPossible = - Bytes.newBuilder(lastKey.length() + 1).append(lastKey).append(new byte[] {0}) - .toBytes(); - tx.set(ntfyRow, NEXT_COL, nextPossible); - } - - // may not read all data because of mem limit, so notify self - tx.setWeakNotification(ntfyRow, col); - } else if (nextKey != null) { - // clear nextKey - tx.delete(ntfyRow, NEXT_COL); - } - } else if (nextKey != null) { - tx.delete(ntfyRow, NEXT_COL); - } - - byte[] dataPrefix = ntfyRow.toArray(); - // TODO this is awful... no sanity check... hard to read - dataPrefix[Bytes.of(mapId).length() + 1] = 'd'; - - BytesBuilder rowBuilder = Bytes.newBuilder(); - rowBuilder.append(dataPrefix); - int rowPrefixLen = rowBuilder.getLength(); - - Set<Bytes> keysToFetch = updates.keySet(); - if (partiallyReadKey != null) { - final Bytes prk = partiallyReadKey; - keysToFetch = Sets.filter(keysToFetch, b -> !b.equals(prk)); - } - Map<Bytes, Map<Column, Bytes>> currentVals = getCurrentValues(tx, rowBuilder, keysToFetch); - - ArrayList<Update<K, V>> updatesToReport = new ArrayList<>(updates.size()); - - for (Entry<Bytes, List<Bytes>> entry : updates.entrySet()) { - rowBuilder.setLength(rowPrefixLen); - Bytes currentValueRow = rowBuilder.append(entry.getKey()).toBytes(); - Bytes currVal = - currentVals.getOrDefault(currentValueRow, Collections.emptyMap()).get(DATA_COLUMN); - - Iterator<V> ui = Iterators.transform(entry.getValue().iterator(), this::deserVal); - - K kd = serializer.deserialize(entry.getKey().toArray(), keyType); - - if (partiallyReadKey != null && partiallyReadKey.equals(entry.getKey())) { - // not all updates were read for this key, so requeue the combined updates as an update - Optional<V> nv = combiner.combine(kd, ui); - if (nv.isPresent()) { - update(tx, Collections.singletonMap(kd, nv.get())); - } - } else { - Optional<V> nv = combiner.combine(kd, concat(ui, currVal)); - Bytes newVal = nv.isPresent() ? Bytes.of(serializer.serialize(nv.get())) : null; - if (newVal != null ^ currVal != null || (currVal != null && !currVal.equals(newVal))) { - if (newVal == null) { - tx.delete(currentValueRow, DATA_COLUMN); - } else { - tx.set(currentValueRow, DATA_COLUMN, newVal); - } - - Optional<V> cvd = Optional.ofNullable(currVal).map(this::deserVal); - updatesToReport.add(new Update<>(kd, cvd, nv)); - } - } - } - - // TODO could clear these as converted to objects to avoid double memory usage - updates.clear(); - currentVals.clear(); - - if (updatesToReport.size() > 0) { - updateObserver.updatingValues(tx, updatesToReport.iterator()); - } - } - - private static final Column DATA_COLUMN = new Column("data", "current"); - - private Map<Bytes, Map<Column, Bytes>> getCurrentValues(TransactionBase tx, BytesBuilder prefix, - Set<Bytes> keySet) { - - Set<Bytes> rows = new HashSet<>(); - - int prefixLen = prefix.getLength(); - for (Bytes key : keySet) { - prefix.setLength(prefixLen); - rows.add(prefix.append(key).toBytes()); - } - - try { - return tx.get(rows, Collections.singleton(DATA_COLUMN)); - } catch (IllegalArgumentException e) { - System.out.println(rows.size()); - throw e; - } - } - - private Iterator<V> concat(Iterator<V> updates, Bytes currentVal) { - if (currentVal == null) { - return updates; - } - - return Iterators.concat(updates, Iterators.singletonIterator(deserVal(currentVal))); - } - - /** - * This method will retrieve the current value for key and any outstanding updates and combine - * them using the configured {@link Combiner}. The result from the combiner is returned. - */ - public V get(SnapshotBase tx, K key) { - - byte[] k = serializer.serialize(key); - - int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets); - - - BytesBuilder rowBuilder = Bytes.newBuilder(); - rowBuilder.append(mapId).append(":u:").append(bucketId).append(":").append(k); - - ScannerConfiguration sc = new ScannerConfiguration(); - sc.setSpan(Span.prefix(rowBuilder.toBytes())); - - RowIterator iter = tx.get(sc); - - Iterator<V> ui; - - if (iter.hasNext()) { - ui = Iterators.transform(iter, e -> deserVal(e.getValue().next().getValue())); - } else { - ui = Collections.<V>emptyList().iterator(); - } - - rowBuilder.setLength(mapId.length()); - rowBuilder.append(":d:").append(bucketId).append(":").append(k); - - Bytes dataRow = rowBuilder.toBytes(); - - Bytes cv = tx.get(dataRow, DATA_COLUMN); - - if (!ui.hasNext()) { - if (cv == null) { - return null; - } else { - return deserVal(cv); - } - } - - return combiner.combine(key, concat(ui, cv)).orElse(null); - } - - String getId() { - return mapId; - } - - /** - * Queues updates for a collision free map. These updates will be made by an Observer executing - * another transaction. This method will not collide with other transaction queuing updates for - * the same keys. - * - * @param tx This transaction will be used to make the updates. - * @param updates The keys in the map should correspond to keys in the collision free map being - * updated. The values in the map will be queued for updating. - */ - public void update(TransactionBase tx, Map<K, V> updates) { - Preconditions.checkState(numBuckets > 0, "Not initialized"); - - Set<String> buckets = new HashSet<>(); - - BytesBuilder rowBuilder = Bytes.newBuilder(); - rowBuilder.append(mapId).append(":u:"); - int prefixLength = rowBuilder.getLength(); - - byte[] startTs = encSeq(tx.getStartTimestamp()); - - for (Entry<K, V> entry : updates.entrySet()) { - byte[] k = serializer.serialize(entry.getKey()); - int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets); - - // reset to the common row prefix - rowBuilder.setLength(prefixLength); - - Bytes row = rowBuilder.append(bucketId).append(":").append(k).append(startTs).toBytes(); - Bytes val = Bytes.of(serializer.serialize(entry.getValue())); - - // TODO set if not exists would be comforting here.... but - // collisions on bucketId+key+uuid should never occur - tx.set(row, UPDATE_COL, val); - - buckets.add(bucketId); - } - - for (String bucketId : buckets) { - rowBuilder.setLength(prefixLength); - rowBuilder.append(bucketId).append(":"); - - Bytes row = rowBuilder.toBytes(); - - tx.setWeakNotification(row, new Column("fluoRecipes", "cfm:" + mapId)); - } - } - - - public static <K2, V2> CollisionFreeMap<K2, V2> getInstance(String mapId, - SimpleConfiguration appConf) { - Options opts = new Options(mapId, appConf); - try { - return new CollisionFreeMap<>(opts, SimpleSerializer.getInstance(appConf)); - } catch (Exception e) { - // TODO - throw new RuntimeException(e); - } - } - - /** - * A @link {@link CollisionFreeMap} stores data in its own data format in the Fluo table. When - * initializing a Fluo table with something like Map Reduce or Spark, data will need to be written - * in this format. Thats the purpose of this method, it provide a simple class that can do this - * conversion. - * - */ - public static <K2, V2> Initializer<K2, V2> getInitializer(String mapId, int numBuckets, - SimpleSerializer serializer) { - return new Initializer<>(mapId, numBuckets, serializer); - } - - - /** - * @see CollisionFreeMap#getInitializer(String, int, SimpleSerializer) - */ - public static class Initializer<K2, V2> implements Serializable { - - private static final long serialVersionUID = 1L; - - private String mapId; - - private SimpleSerializer serializer; - - private int numBuckets = -1; - - private Initializer(String mapId, int numBuckets, SimpleSerializer serializer) { - this.mapId = mapId; - this.numBuckets = numBuckets; - this.serializer = serializer; - } - - public RowColumnValue convert(K2 key, V2 val) { - byte[] k = serializer.serialize(key); - int hash = Hashing.murmur3_32().hashBytes(k).asInt(); - String bucketId = BucketUtil.genBucketId(Math.abs(hash % numBuckets), numBuckets); - - BytesBuilder bb = Bytes.newBuilder(); - Bytes row = bb.append(mapId).append(":d:").append(bucketId).append(":").append(k).toBytes(); - byte[] v = serializer.serialize(val); - - return new RowColumnValue(row, DATA_COLUMN, Bytes.of(v)); - } - } - - public static class Options { - - static final long DEFAULT_BUFFER_SIZE = 1 << 22; - static final int DEFAULT_BUCKETS_PER_TABLET = 10; - - int numBuckets; - Integer bucketsPerTablet = null; - - Long bufferSize; - - String keyType; - String valueType; - String combinerType; - String updateObserverType; - String mapId; - - private static final String PREFIX = "recipes.cfm."; - - Options(String mapId, SimpleConfiguration appConfig) { - this.mapId = mapId; - - this.numBuckets = appConfig.getInt(PREFIX + mapId + ".buckets"); - this.combinerType = appConfig.getString(PREFIX + mapId + ".combiner"); - this.keyType = appConfig.getString(PREFIX + mapId + ".key"); - this.valueType = appConfig.getString(PREFIX + mapId + ".val"); - this.updateObserverType = appConfig.getString(PREFIX + mapId + ".updateObserver", null); - this.bufferSize = appConfig.getLong(PREFIX + mapId + ".bufferSize", DEFAULT_BUFFER_SIZE); - this.bucketsPerTablet = - appConfig.getInt(PREFIX + mapId + ".bucketsPerTablet", DEFAULT_BUCKETS_PER_TABLET); - } - - public Options(String mapId, String combinerType, String keyType, String valType, int buckets) { - Preconditions.checkArgument(buckets > 0); - Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'"); - - this.mapId = mapId; - this.numBuckets = buckets; - this.combinerType = combinerType; - this.updateObserverType = null; - this.keyType = keyType; - this.valueType = valType; - } - - public Options(String mapId, String combinerType, String updateObserverType, String keyType, - String valueType, int buckets) { - Preconditions.checkArgument(buckets > 0); - Preconditions.checkArgument(!mapId.contains(":"), "Map id cannot contain ':'"); - - this.mapId = mapId; - this.numBuckets = buckets; - this.combinerType = combinerType; - this.updateObserverType = updateObserverType; - this.keyType = keyType; - this.valueType = valueType; - } - - /** - * Sets a limit on the amount of serialized updates to read into memory. Additional memory will - * be used to actually deserialize and process the updates. This limit does not account for - * object overhead in java, which can be significant. - * - * <p> - * The way memory read is calculated is by summing the length of serialized key and value byte - * arrays. Once this sum exceeds the configured memory limit, no more update key values are - * processed in the current transaction. When not everything is processed, the observer - * processing updates will notify itself causing another transaction to continue processing - * later - */ - public Options setBufferSize(long bufferSize) { - Preconditions.checkArgument(bufferSize > 0, "Buffer size must be positive"); - this.bufferSize = bufferSize; - return this; - } - - long getBufferSize() { - if (bufferSize == null) { - return DEFAULT_BUFFER_SIZE; - } - - return bufferSize; - } - - /** - * Sets the number of buckets per tablet to generate. This affects how many split points will be - * generated when optimizing the Accumulo table. - * - */ - public Options setBucketsPerTablet(int bucketsPerTablet) { - Preconditions.checkArgument(bucketsPerTablet > 0, "bucketsPerTablet is <= 0 : " - + bucketsPerTablet); - this.bucketsPerTablet = bucketsPerTablet; - return this; - } - - int getBucketsPerTablet() { - if (bucketsPerTablet == null) { - return DEFAULT_BUCKETS_PER_TABLET; - } - - return bucketsPerTablet; - } - - public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, Class<K> keyType, - Class<V> valueType, int buckets) { - this(mapId, combiner.getName(), keyType.getName(), valueType.getName(), buckets); - } - - public <K, V> Options(String mapId, Class<? extends Combiner<K, V>> combiner, - Class<? extends UpdateObserver<K, V>> updateObserver, Class<K> keyType, Class<V> valueType, - int buckets) { - this(mapId, combiner.getName(), updateObserver.getName(), keyType.getName(), valueType - .getName(), buckets); - } - - void save(SimpleConfiguration appConfig) { - appConfig.setProperty(PREFIX + mapId + ".buckets", numBuckets + ""); - appConfig.setProperty(PREFIX + mapId + ".combiner", combinerType + ""); - appConfig.setProperty(PREFIX + mapId + ".key", keyType); - appConfig.setProperty(PREFIX + mapId + ".val", valueType); - if (updateObserverType != null) { - appConfig.setProperty(PREFIX + mapId + ".updateObserver", updateObserverType + ""); - } - if (bufferSize != null) { - appConfig.setProperty(PREFIX + mapId + ".bufferSize", bufferSize); - } - if (bucketsPerTablet != null) { - appConfig.setProperty(PREFIX + mapId + ".bucketsPerTablet", bucketsPerTablet); - } - } - } - - /** - * This method configures a collision free map for use. It must be called before initializing - * Fluo. - */ - public static void configure(FluoConfiguration fluoConfig, Options opts) { - opts.save(fluoConfig.getAppConfiguration()); - fluoConfig.addObserver(new ObserverConfiguration(CollisionFreeMapObserver.class.getName()) - .setParameters(ImmutableMap.of("mapId", opts.mapId))); - - Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END); - Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END); - - new TransientRegistry(fluoConfig.getAppConfiguration()).addTransientRange("cfm." + opts.mapId, - new RowRange(dataRangeEnd, updateRangeEnd)); - } - - /** - * Return suggested Fluo table optimizations for all previously configured collision free maps. - * - * @param appConfig Must pass in the application configuration obtained from - * {@code FluoClient.getAppConfiguration()} or - * {@code FluoConfiguration.getAppConfiguration()} - */ - public static Pirtos getTableOptimizations(SimpleConfiguration appConfig) { - HashSet<String> mapIds = new HashSet<>(); - appConfig.getKeys(Options.PREFIX.substring(0, Options.PREFIX.length() - 1)).forEachRemaining( - k -> mapIds.add(k.substring(Options.PREFIX.length()).split("\\.", 2)[0])); - - Pirtos pirtos = new Pirtos(); - mapIds.forEach(mid -> pirtos.merge(getTableOptimizations(mid, appConfig))); - - return pirtos; - } - - /** - * Return suggested Fluo table optimizations for the specified collisiong free map. - * - * @param appConfig Must pass in the application configuration obtained from - * {@code FluoClient.getAppConfiguration()} or - * {@code FluoConfiguration.getAppConfiguration()} - */ - public static Pirtos getTableOptimizations(String mapId, SimpleConfiguration appConfig) { - Options opts = new Options(mapId, appConfig); - - BytesBuilder rowBuilder = Bytes.newBuilder(); - rowBuilder.append(mapId); - - List<Bytes> dataSplits = new ArrayList<>(); - for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) { - String bucketId = BucketUtil.genBucketId(i, opts.numBuckets); - rowBuilder.setLength(mapId.length()); - dataSplits.add(rowBuilder.append(":d:").append(bucketId).toBytes()); - } - Collections.sort(dataSplits); - - List<Bytes> updateSplits = new ArrayList<>(); - for (int i = opts.getBucketsPerTablet(); i < opts.numBuckets; i += opts.getBucketsPerTablet()) { - String bucketId = BucketUtil.genBucketId(i, opts.numBuckets); - rowBuilder.setLength(mapId.length()); - updateSplits.add(rowBuilder.append(":u:").append(bucketId).toBytes()); - } - Collections.sort(updateSplits); - - Bytes dataRangeEnd = Bytes.of(opts.mapId + DATA_RANGE_END); - Bytes updateRangeEnd = Bytes.of(opts.mapId + UPDATE_RANGE_END); - - List<Bytes> splits = new ArrayList<>(); - splits.add(dataRangeEnd); - splits.add(updateRangeEnd); - splits.addAll(dataSplits); - splits.addAll(updateSplits); - - Pirtos pirtos = new Pirtos(); - pirtos.setSplits(splits); - - pirtos.setTabletGroupingRegex(Pattern.quote(mapId + ":") + "[du]:"); - - return pirtos; - } - - private static byte[] encSeq(long l) { - byte[] ret = new byte[8]; - ret[0] = (byte) (l >>> 56); - ret[1] = (byte) (l >>> 48); - ret[2] = (byte) (l >>> 40); - ret[3] = (byte) (l >>> 32); - ret[4] = (byte) (l >>> 24); - ret[5] = (byte) (l >>> 16); - ret[6] = (byte) (l >>> 8); - ret[7] = (byte) (l >>> 0); - return ret; - } -}