fixes #743 made data classes final
Project: http://git-wip-us.apache.org/repos/asf/incubator-fluo/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-fluo/commit/2805e2fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-fluo/tree/2805e2fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-fluo/diff/2805e2fa Branch: refs/heads/master Commit: 2805e2fadbcc1dedfcc9c422e642d39fefe3cdfc Parents: 051a33b Author: Keith Turner <ke...@deenlo.com> Authored: Thu Jul 28 17:32:55 2016 -0400 Committer: Keith Turner <ktur...@apache.org> Committed: Fri Jul 29 15:31:51 2016 -0400 ---------------------------------------------------------------------- .../apache/fluo/accumulo/data/MutableBytes.java | 135 ------------------- .../fluo/accumulo/data/MutableBytesFactory.java | 30 ----- .../java/org/apache/fluo/api/data/Bytes.java | 109 +++++++++------ .../java/org/apache/fluo/api/data/Column.java | 2 +- .../org/apache/fluo/api/data/ColumnValue.java | 2 +- .../org/apache/fluo/api/data/RowColumn.java | 2 +- .../apache/fluo/api/data/RowColumnValue.java | 77 +++++++---- .../java/org/apache/fluo/api/data/Span.java | 2 +- .../org/apache/fluo/core/impl/Notification.java | 28 ++-- .../apache/fluo/core/impl/TransactionImpl.java | 6 +- .../java/org/apache/fluo/core/util/Hex.java | 2 +- .../fluo/core/worker/NotificationProcessor.java | 10 +- .../finder/hash/HashNotificationFinder.java | 6 +- .../apache/fluo/core/data/MutableBytesTest.java | 52 ------- .../fluo/core/data/RowColumnValueTest.java | 7 - .../fluo/mapreduce/FluoEntryInputFormat.java | 2 +- 16 files changed, 158 insertions(+), 314 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java deleted file mode 100644 index 898382f..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytes.java +++ /dev/null @@ -1,135 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.accumulo.data; - -import java.io.Serializable; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -import org.apache.fluo.api.data.Bytes; - -/** - * An implementation of {@link Bytes} that is mutable and uses a backing byte array - */ -public class MutableBytes extends Bytes implements Serializable { - - private static final long serialVersionUID = 1L; - - private final byte[] data; - private final int offset; - private final int length; - - public MutableBytes() { - this.data = null; - this.offset = 0; - this.length = 0; - } - - /** - * Creates a new MutableBytes. The given byte array is used directly as the backing array so later - * changes made to the array reflect into the new sequence. - */ - public MutableBytes(byte[] data) { - this.data = data; - this.offset = 0; - this.length = data.length; - } - - /** - * Creates a new MutableBytes from a subsequence of the given byte array. The given byte array is - * used directly as the backing array, so later changes made to the (relevant portion of the) - * array reflect into the new sequence. - * - * @param data byte data - * @param offset starting offset in byte array (inclusive) - * @param length number of bytes to include in sequence - * @throws IllegalArgumentException if the offset or length are out of bounds for the given byte - * array - */ - public MutableBytes(byte[] data, int offset, int length) { - if (offset < 0 || offset > data.length || length < 0 || (offset + length) > data.length) { - throw new IllegalArgumentException(" Bad offset and/or length data.length = " + data.length - + " offset = " + offset + " length = " + length); - } - this.data = data; - this.offset = offset; - this.length = length; - } - - /** - * Creates a new MutableBytes from the given string. The bytes are determined from the string - * using UTF-8 encoding - * - * @param s String to represent as Bytes - */ - public MutableBytes(String s) { - this(s.getBytes(StandardCharsets.UTF_8)); - } - - /** - * Creates a new MutableBytes from the given string. The bytes are determined from the string - * using the specified charset - * - * @param s String to represent as Bytes - * @param cs Charset - */ - public MutableBytes(String s, Charset cs) { - this(s.getBytes(cs)); - } - - @Override - public byte byteAt(int i) { - - if (i < 0) { - throw new IllegalArgumentException("i < 0, " + i); - } - - if (i >= length) { - throw new IllegalArgumentException("i >= length, " + i + " >= " + length); - } - - return data[offset + i]; - } - - @Override - public int length() { - return length; - } - - @Override - public Bytes subSequence(int start, int end) { - if (start > end || start < 0 || end > length) { - throw new IllegalArgumentException("Bad start and/end start = " + start + " end=" + end - + " offset=" + offset + " length=" + length); - } - return new MutableBytes(data, offset + start, end - start); - } - - @Override - public byte[] toArray() { - byte[] copy = new byte[length]; - System.arraycopy(data, offset, copy, 0, length); - return copy; - } - - /** - * Creates UTF-8 String using Bytes data - */ - @Override - public String toString() { - return new String(data, offset, length, StandardCharsets.UTF_8); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java ---------------------------------------------------------------------- diff --git a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java b/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java deleted file mode 100644 index a8b4dab..0000000 --- a/modules/accumulo/src/main/java/org/apache/fluo/accumulo/data/MutableBytesFactory.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.accumulo.data; - -import org.apache.fluo.api.data.Bytes; - -/** - * An implementation of BytesFactory - */ -public class MutableBytesFactory implements Bytes.BytesFactory { - - @Override - public Bytes get(byte[] data) { - return new MutableBytes(data); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java index b02e96a..3728243 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java +++ b/modules/api/src/main/java/org/apache/fluo/api/data/Bytes.java @@ -37,19 +37,14 @@ import java.util.Objects; * * @since 1.0.0 */ -public abstract class Bytes implements Comparable<Bytes>, Serializable { +public final class Bytes implements Comparable<Bytes>, Serializable { private static final long serialVersionUID = 1L; - private static final String BYTES_FACTORY_CLASS = - "org.apache.fluo.accumulo.data.MutableBytesFactory"; private static final String WRITE_UTIL_CLASS = "org.apache.fluo.accumulo.data.WriteUtilImpl"; - /** - * @since 1.0.0 - */ - public interface BytesFactory { - Bytes get(byte[] data); - } + private final byte[] data; + private final int offset; + private final int length; /** * @since 1.0.0 @@ -60,13 +55,10 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { int readVInt(DataInput stream) throws IOException; } - private static BytesFactory bytesFactory; private static WriteUtil writeUtil; static { try { - bytesFactory = - (BytesFactory) Class.forName(BYTES_FACTORY_CLASS).getDeclaredConstructor().newInstance(); writeUtil = (WriteUtil) Class.forName(WRITE_UTIL_CLASS).getDeclaredConstructor().newInstance(); } catch (Exception e) { @@ -74,11 +66,25 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { } } - public static final Bytes EMPTY = bytesFactory.get(new byte[0]); + public static final Bytes EMPTY = new Bytes(new byte[0]); private Integer hashCode = null; - public Bytes() {} + private Bytes(byte[] data) { + this.data = data; + this.offset = 0; + this.length = data.length; + } + + private Bytes(byte[] data, int offset, int length) { + if (offset < 0 || offset > data.length || length < 0 || (offset + length) > data.length) { + throw new IndexOutOfBoundsException(" Bad offset and/or length data.length = " + data.length + + " offset = " + offset + " length = " + length); + } + this.data = data; + this.offset = offset; + this.length = length; + } /** * Gets a byte within this sequence of bytes @@ -87,12 +93,25 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { * @return byte * @throws IllegalArgumentException if i is out of range */ - public abstract byte byteAt(int i); + public byte byteAt(int i) { + + if (i < 0) { + throw new IndexOutOfBoundsException("i < 0, " + i); + } + + if (i >= length) { + throw new IndexOutOfBoundsException("i >= length, " + i + " >= " + length); + } + + return data[offset + i]; + } /** * Gets the length of bytes */ - public abstract int length(); + public int length() { + return length; + } /** * Returns a portion of the Bytes object @@ -100,17 +119,35 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { * @param start index of subsequence start (inclusive) * @param end index of subsequence end (exclusive) */ - public abstract Bytes subSequence(int start, int end); + public Bytes subSequence(int start, int end) { + if (start > end || start < 0 || end > length) { + throw new IllegalArgumentException("Bad start and/end start = " + start + " end=" + end + + " offset=" + offset + " length=" + length); + } + return new Bytes(data, offset + start, end - start); + } /** * Returns a byte array containing a copy of the bytes */ - public abstract byte[] toArray(); + public byte[] toArray() { + byte[] copy = new byte[length]; + System.arraycopy(data, offset, copy, 0, length); + return copy; + } /** - * Compares the two given byte sequences, byte by byte, returning a negative, zero, or positive - * result if the first sequence is less than, equal to, or greater than the second. The comparison - * is performed starting with the first byte of each sequence, and proceeds until a pair of bytes + * Creates UTF-8 String using Bytes data + */ + @Override + public String toString() { + return new String(data, offset, length, StandardCharsets.UTF_8); + } + + /** + * Compares this to the given bytes, byte by byte, returning a negative, zero, or positive result + * if the first sequence is less than, equal to, or greater than the second. The comparison is + * performed starting with the first byte of each sequence, and proceeds until a pair of bytes * differs, or one sequence runs out of byte (is shorter). A shorter sequence is considered less * than a longer one. * @@ -118,27 +155,19 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { * @param b2 second byte sequence to compare * @return comparison result */ - public static final int compareBytes(Bytes b1, Bytes b2) { - - int minLen = Math.min(b1.length(), b2.length()); + @Override + public final int compareTo(Bytes other) { + int minLen = Math.min(this.length(), other.length()); for (int i = 0; i < minLen; i++) { - int a = (b1.byteAt(i) & 0xff); - int b = (b2.byteAt(i) & 0xff); + int a = (this.byteAt(i) & 0xff); + int b = (other.byteAt(i) & 0xff); if (a != b) { return a - b; } } - return b1.length() - b2.length(); - } - - /** - * Compares this Bytes object to another. - */ - @Override - public final int compareTo(Bytes other) { - return compareBytes(this, other); + return this.length() - other.length(); } /** @@ -184,7 +213,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { } byte[] copy = new byte[array.length]; System.arraycopy(array, 0, copy, 0, array.length); - return bytesFactory.get(copy); + return new Bytes(copy); } /** @@ -201,7 +230,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { } byte[] copy = new byte[length]; System.arraycopy(data, offset, copy, 0, length); - return bytesFactory.get(copy); + return new Bytes(copy); } /** @@ -215,7 +244,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { byte[] data = new byte[bb.remaining()]; // duplicate so that it does not change position bb.duplicate().get(data); - return bytesFactory.get(data); + return new Bytes(data); } /** @@ -227,7 +256,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { return EMPTY; } byte[] data = s.getBytes(StandardCharsets.UTF_8); - return bytesFactory.get(data); + return new Bytes(data); } /** @@ -240,7 +269,7 @@ public abstract class Bytes implements Comparable<Bytes>, Serializable { return EMPTY; } byte[] data = s.getBytes(c); - return bytesFactory.get(data); + return new Bytes(data); } /** http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/Column.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Column.java b/modules/api/src/main/java/org/apache/fluo/api/data/Column.java index 8d17a93..8876249 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/data/Column.java +++ b/modules/api/src/main/java/org/apache/fluo/api/data/Column.java @@ -25,7 +25,7 @@ import java.util.Objects; * * @since 1.0.0 */ -public class Column implements Comparable<Column>, Serializable { +public final class Column implements Comparable<Column>, Serializable { private static final long serialVersionUID = 1L; public static final Bytes UNSET = Bytes.of(new byte[0]); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java index 4080e8c..dc65057 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java +++ b/modules/api/src/main/java/org/apache/fluo/api/data/ColumnValue.java @@ -21,7 +21,7 @@ import java.io.Serializable; * @since 1.0.0 */ -public class ColumnValue implements Serializable, Comparable<ColumnValue> { +public final class ColumnValue implements Serializable, Comparable<ColumnValue> { private static final long serialVersionUID = 1L; private Column column; http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java index 1fa43d7..48b18b3 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java +++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumn.java @@ -24,7 +24,7 @@ import java.util.Objects; * * @since 1.0.0 */ -public class RowColumn implements Comparable<RowColumn>, Serializable { +public final class RowColumn implements Comparable<RowColumn>, Serializable { private static final long serialVersionUID = 1L; public static RowColumn EMPTY = new RowColumn(); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java index de32948..62968be 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java +++ b/modules/api/src/main/java/org/apache/fluo/api/data/RowColumnValue.java @@ -15,18 +15,24 @@ package org.apache.fluo.api.data; +import java.io.Serializable; +import java.util.Objects; + /** * An immutable object that can hold a row, column, and value. * * @since 1.0.0 */ -public class RowColumnValue extends RowColumn { +public final class RowColumnValue implements Comparable<RowColumnValue>, Serializable { private static final long serialVersionUID = 1L; + private Bytes row = Bytes.EMPTY; + private Column col = Column.EMPTY; private Bytes val = Bytes.EMPTY; public RowColumnValue(Bytes row, Column col, Bytes val) { - super(row, col); + this.row = row; + this.col = col; this.val = val; } @@ -35,10 +41,38 @@ public class RowColumnValue extends RowColumn { * @param val (will be UTF-8 encoded) */ public RowColumnValue(String row, Column col, String val) { - super(Bytes.of(row), col); + this.row = Bytes.of(row); + this.col = col; this.val = Bytes.of(val); } + /** + * Retrieves Row in RowColumn + * + * @return Row + */ + public Bytes getRow() { + return row; + } + + /** + * Retrieves Row in RowColumn as a String using UTF-8 encoding. + * + * @return Row + */ + public String getsRow() { + return row.toString(); + } + + /** + * Retrieves Column in RowColumn + * + * @return Column + */ + public Column getColumn() { + return col; + } + public Bytes getValue() { return val; } @@ -47,9 +81,13 @@ public class RowColumnValue extends RowColumn { return val.toString(); } + public RowColumn getRowColumn() { + return new RowColumn(row, col); + } + @Override public int hashCode() { - return super.hashCode() + 31 * val.hashCode(); + return Objects.hash(row, col, val); } @Override @@ -60,34 +98,25 @@ public class RowColumnValue extends RowColumn { if (o instanceof RowColumnValue) { RowColumnValue orcv = (RowColumnValue) o; - - if (super.equals(orcv)) { - return val.equals(orcv.val); - } + return row.equals(orcv.row) && col.equals(orcv.col) && val.equals(orcv.val); } return false; } @Override - public int compareTo(RowColumn orc) { - if (orc == this) { - return 0; - } - - if (!(orc instanceof RowColumnValue)) { - throw new IllegalArgumentException("Can only compare to same type"); - } + public String toString() { + return getRowColumn() + " " + val; + } - int result = super.compareTo(orc); + @Override + public int compareTo(RowColumnValue o) { + int result = row.compareTo(o.row); if (result == 0) { - RowColumnValue orcv = (RowColumnValue) orc; - result = val.compareTo(orcv.val); + result = col.compareTo(o.col); + if (result == 0) { + result = val.compareTo(o.val); + } } return result; } - - @Override - public String toString() { - return super.toString() + " " + val; - } } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/api/src/main/java/org/apache/fluo/api/data/Span.java ---------------------------------------------------------------------- diff --git a/modules/api/src/main/java/org/apache/fluo/api/data/Span.java b/modules/api/src/main/java/org/apache/fluo/api/data/Span.java index 6ac084b..a5a9f94 100644 --- a/modules/api/src/main/java/org/apache/fluo/api/data/Span.java +++ b/modules/api/src/main/java/org/apache/fluo/api/data/Span.java @@ -24,7 +24,7 @@ import java.util.Objects; * * @since 1.0.0 */ -public class Span implements Serializable { +public final class Span implements Serializable { private static final long serialVersionUID = 1L; private RowColumn start = RowColumn.EMPTY; http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java b/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java index 4db011d..17c3e00 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/Notification.java @@ -39,13 +39,13 @@ import static org.apache.fluo.accumulo.util.NotificationUtil.isDelete; * See {@link NotificationIterator} for explanation of notification timestamp serialization. * */ -public class Notification extends RowColumn { - private static final long serialVersionUID = 1L; +public class Notification { - private long timestamp; + private final RowColumn rowCol; + private final long timestamp; public Notification(Bytes row, Column col, long ts) { - super(row, col); + rowCol = new RowColumn(row, col); this.timestamp = ts; } @@ -53,15 +53,27 @@ public class Notification extends RowColumn { return timestamp; } + public Bytes getRow() { + return rowCol.getRow(); + } + + public Column getColumn() { + return rowCol.getColumn(); + } + + public RowColumn getRowColumn() { + return rowCol; + } + public Flutation newDelete(Environment env) { return newDelete(env, getTimestamp()); } public Flutation newDelete(Environment env, long ts) { - Flutation m = new Flutation(env, getRow()); - ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(getColumn()); - m.put(ColumnConstants.NOTIFY_CF.toArray(), encodeCol(getColumn()), cv, encodeTs(ts, true), - TransactionImpl.EMPTY); + Flutation m = new Flutation(env, rowCol.getRow()); + ColumnVisibility cv = env.getSharedResources().getVisCache().getCV(rowCol.getColumn()); + m.put(ColumnConstants.NOTIFY_CF.toArray(), encodeCol(rowCol.getColumn()), cv, + encodeTs(ts, true), TransactionImpl.EMPTY); return m; } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java index ca84807..f25099b 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java +++ b/modules/core/src/main/java/org/apache/fluo/core/impl/TransactionImpl.java @@ -510,11 +510,11 @@ public class TransactionImpl implements AsyncTransaction, Snapshot { if (notification.getColumn().equals(col)) { // check to see if ACK exist after notification - Key startKey = SpanUtil.toKey(notification); + Key startKey = SpanUtil.toKey(notification.getRowColumn()); startKey.setTimestamp(ColumnConstants.ACK_PREFIX | (Long.MAX_VALUE & ColumnConstants.TIMESTAMP_MASK)); - Key endKey = SpanUtil.toKey(notification); + Key endKey = SpanUtil.toKey(notification.getRowColumn()); endKey.setTimestamp(ColumnConstants.ACK_PREFIX | (notification.getTimestamp() + 1)); Range range = new Range(startKey, endKey); @@ -786,7 +786,7 @@ public class TransactionImpl implements AsyncTransaction, Snapshot { if (primary != null) { primRow = primary.getRow(); primCol = primary.getColumn(); - if (notification != null && !primary.equals(notification)) { + if (notification != null && !primary.equals(notification.getRowColumn())) { throw new IllegalArgumentException("Primary must be notification"); } } else if (notification != null) { http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java index f08a4ea..f9fc85f 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java +++ b/modules/core/src/main/java/org/apache/fluo/core/util/Hex.java @@ -73,7 +73,7 @@ public class Hex { public static String encNonAscii(Notification n) { StringBuilder sb = new StringBuilder(); - encNonAscii(sb, n, " "); + encNonAscii(sb, n.getRowColumn(), " "); sb.append(" "); sb.append(n.getTimestamp()); return sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java index e39bfbf..1834835 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java +++ b/modules/core/src/main/java/org/apache/fluo/core/worker/NotificationProcessor.java @@ -182,14 +182,14 @@ public class NotificationProcessor implements AutoCloseable { new WorkTaskAsync(this, notificationFinder, env, notification, observers); FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask); - if (!tracker.add(notification, ft)) { + if (!tracker.add(notification.getRowColumn(), ft)) { return false; } try { executor.execute(ft); } catch (RejectedExecutionException rje) { - tracker.remove(notification); + tracker.remove(notification.getRowColumn()); throw rje; } @@ -203,18 +203,18 @@ public class NotificationProcessor implements AutoCloseable { new WorkTaskAsync(this, notificationFinder, env, notification, observers); FutureTask<?> ft = new FutureNotificationTask(notification, notificationFinder, workTask); - if (tracker.requeue(notification, ft)) { + if (tracker.requeue(notification.getRowColumn(), ft)) { try { executor.execute(ft); } catch (RejectedExecutionException rje) { - tracker.remove(notification); + tracker.remove(notification.getRowColumn()); throw rje; } } } public void notificationProcessed(final Notification notification) { - tracker.remove(notification); + tracker.remove(notification.getRowColumn()); } public int size() { http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java index b71954b..e18ec2f 100644 --- a/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java +++ b/modules/core/src/main/java/org/apache/fluo/core/worker/finder/hash/HashNotificationFinder.java @@ -34,7 +34,6 @@ import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode.Mode; import org.apache.curator.utils.ZKPaths; -import org.apache.fluo.accumulo.data.MutableBytes; import org.apache.fluo.accumulo.iterators.NotificationHashFilter; import org.apache.fluo.accumulo.util.NotificationUtil; import org.apache.fluo.accumulo.util.ZookeeperPath; @@ -193,9 +192,8 @@ public class HashNotificationFinder implements NotificationFinder { @VisibleForTesting static boolean shouldProcess(Notification notification, int divisor, int remainder) { byte[] cfcq = NotificationUtil.encodeCol(notification.getColumn()); - return NotificationHashFilter.accept( - ByteUtil.toByteSequence((MutableBytes) notification.getRow()), new ArrayByteSequence(cfcq), - divisor, remainder); + return NotificationHashFilter.accept(ByteUtil.toByteSequence(notification.getRow()), + new ArrayByteSequence(cfcq), divisor, remainder); } @Override http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java b/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java deleted file mode 100644 index c6f4a1d..0000000 --- a/modules/core/src/test/java/org/apache/fluo/core/data/MutableBytesTest.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ - -package org.apache.fluo.core.data; - -import org.apache.fluo.accumulo.data.MutableBytes; -import org.apache.fluo.api.data.Bytes; -import org.junit.Assert; -import org.junit.Test; - -/** - * Unit test for {@link MutableBytes} - */ -public class MutableBytesTest { - - @Test - public void testImmutableBytes() { - byte[] d1 = Bytes.of("mydata").toArray(); - - MutableBytes mutable = new MutableBytes(d1); - Assert.assertNotSame(d1, mutable.toArray()); - - Bytes immutable = Bytes.of(d1); - Assert.assertNotSame(d1, immutable.toArray()); - Assert.assertEquals(mutable, immutable); - Assert.assertNotSame(mutable, immutable); - - Bytes read = mutable; - Assert.assertEquals(read, immutable); - Assert.assertSame(read, mutable); - Assert.assertEquals(read, mutable); - Assert.assertNotSame(d1, read.toArray()); - - MutableBytes write = (MutableBytes) immutable; - Assert.assertEquals(write, mutable); - Assert.assertNotSame(write, mutable); - byte[] d2 = write.toArray(); - Assert.assertNotSame(d2, write.toArray()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java b/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java index f7a47d4..63348b1 100644 --- a/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java +++ b/modules/core/src/test/java/org/apache/fluo/core/data/RowColumnValueTest.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.fluo.api.data.Bytes; import org.apache.fluo.api.data.Column; -import org.apache.fluo.api.data.RowColumn; import org.apache.fluo.api.data.RowColumnValue; import org.junit.Assert; import org.junit.Test; @@ -85,10 +84,4 @@ public class RowColumnValueTest { public void testToString() { Assert.assertEquals("row1 fam1 qual1 a", rcv1.toString()); } - - @Test(expected = IllegalArgumentException.class) - public void testCompareRowColumn() { - rcv1.compareTo(new RowColumn("foo")); - } - } http://git-wip-us.apache.org/repos/asf/incubator-fluo/blob/2805e2fa/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java ---------------------------------------------------------------------- diff --git a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java index 18c091c..d8ea54b 100644 --- a/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java +++ b/modules/mapreduce/src/main/java/org/apache/fluo/mapreduce/FluoEntryInputFormat.java @@ -76,7 +76,7 @@ public class FluoEntryInputFormat extends InputFormat<RowColumn, Bytes> { @Override public RowColumn getCurrentKey() throws IOException, InterruptedException { - return rowColVal; + return rowColVal.getRowColumn(); } @Override