Repository: beam Updated Branches: refs/heads/master 63b54a5b0 -> 480e60fa4
OrderedCode library provides encoding a sequence of typed enities into a byte array that can be lexicographically sorted. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/57bde534 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/57bde534 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/57bde534 Branch: refs/heads/master Commit: 57bde5340ffc94735ec374c88ef5374323d6321b Parents: 63b54a5 Author: Mairbek Khadikov <mair...@google.com> Authored: Fri Sep 29 16:43:05 2017 -0700 Committer: chamik...@google.com <chamik...@google.com> Committed: Fri Oct 6 09:05:05 2017 -0700 ---------------------------------------------------------------------- .../beam/sdk/io/gcp/spanner/OrderedCode.java | 764 ++++++++++++++++ .../sdk/io/gcp/spanner/OrderedCodeTest.java | 890 +++++++++++++++++++ 2 files changed, 1654 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/57bde534/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java new file mode 100644 index 0000000..80290d6 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCode.java @@ -0,0 +1,764 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.math.LongMath; +import com.google.common.primitives.Longs; +import com.google.common.primitives.UnsignedInteger; + +import java.math.RoundingMode; +import java.util.ArrayList; +import java.util.Arrays; + +/** + * This module provides routines for encoding a sequence of typed + * entities into a byte array. The resulting byte arrays can be + * lexicographically compared to yield the same comparison value that + * would have been generated if the encoded items had been compared + * one by one according to their type. + * + * <p>More precisely, suppose: + * 1. byte array A is generated by encoding the sequence of items [A_1..A_n] + * 2. byte array B is generated by encoding the sequence of items [B_1..B_n] + * 3. The types match; i.e., for all i: A_i was encoded using + * the same routine as B_i + * + * <p>Then: + * Comparing A vs. B lexicographically is the same as comparing + * the vectors [A_1..A_n] and [B_1..B_n] lexicographically. + * + * <p><b>This class is NOT thread safe.</b> + */ +class OrderedCode { + // We want to encode a few extra symbols in strings: + // <sep> Separator between items + // <infinity> Infinite string + // + // Therefore we need an alphabet with at least 258 characters. We + // achieve this by using two-letter sequences starting with '\0' and '\xff' + // as extra symbols: + // <sep> encoded as => \0\1 + // \0 encoded as => \0\xff + // \xff encoded as => \xff\x00 + // <infinity> encoded as => \xff\xff + // + // The remaining two letter sequences starting with '\0' and '\xff' + // are currently unused. + + static final byte ESCAPE1 = 0x00; + static final byte NULL_CHARACTER = + (byte) 0xff; // Combined with ESCAPE1 + static final byte SEPARATOR = 0x01; // Combined with ESCAPE1 + + static final byte ESCAPE2 = (byte) 0xff; + static final byte INFINITY = + (byte) 0xff; // Combined with ESCAPE2 + static final byte FF_CHARACTER = 0x00; // Combined with ESCAPE2 + + static final byte[] ESCAPE1_SEPARATOR = { ESCAPE1, SEPARATOR }; + + static final byte[] INFINITY_ENCODED = { ESCAPE2, INFINITY }; + + static final byte[] INFINITY_ENCODED_DECREASING = {invert(ESCAPE2), invert(INFINITY)}; + + /** + * This array maps encoding length to header bits in the first two bytes for + * SignedNumIncreasing encoding. + */ + private static final byte[][] LENGTH_TO_HEADER_BITS = { + { 0, 0 }, + { (byte) 0x80, 0 }, + { (byte) 0xc0, 0 }, + { (byte) 0xe0, 0 }, + { (byte) 0xf0, 0 }, + { (byte) 0xf8, 0 }, + { (byte) 0xfc, 0 }, + { (byte) 0xfe, 0 }, + { (byte) 0xff, 0 }, + { (byte) 0xff, (byte) 0x80 }, + { (byte) 0xff, (byte) 0xc0 } + }; + + /** + * This array maps encoding lengths to the header bits that overlap with + * the payload and need fixing during readSignedNumIncreasing. + */ + private static final long[] LENGTH_TO_MASK = { + 0L, + 0x80L, + 0xc000L, + 0xe00000L, + 0xf0000000L, + 0xf800000000L, + 0xfc0000000000L, + 0xfe000000000000L, + 0xff00000000000000L, + 0x8000000000000000L, + 0L + }; + + /** + * This array maps the number of bits in a number to the encoding + * length produced by WriteSignedNumIncreasing. + * For positive numbers, the number of bits is 1 plus the most significant + * bit position (the highest bit position in a positive long is 63). + * For a negative number n, we count the bits in ~n. + * That is, length = BITS_TO_LENGTH[log2Floor(n < 0 ? ~n : n) + 1]. + */ + private static final short[] BITS_TO_LENGTH = { + 1, 1, 1, 1, 1, 1, 1, + 2, 2, 2, 2, 2, 2, 2, + 3, 3, 3, 3, 3, 3, 3, + 4, 4, 4, 4, 4, 4, 4, + 5, 5, 5, 5, 5, 5, 5, + 6, 6, 6, 6, 6, 6, 6, + 7, 7, 7, 7, 7, 7, 7, + 8, 8, 8, 8, 8, 8, 8, + 9, 9, 9, 9, 9, 9, 9, + 10 + }; + + // stores the current encoded value as a list of byte arrays. Note that this + // is manipulated as we read/write items. + // Note that every item will fit on at most one array. One array may + // have more than one item (eg when used for decoding). While encoding, + // one array will have exactly one item. While returning the encoded array + // we will merge all the arrays in this list. + private final ArrayList<byte[]> encodedArrays = new ArrayList<byte[]>(); + + // This is the current position on the first array. Will be non-zero + // only if the ordered code was created using encoded byte array. + private int firstArrayPosition = 0; + + /** + * Creates OrderedCode from scratch. Typically used at encoding time. + */ + public OrderedCode(){ + } + + /** + * Creates OrderedCode from a given encoded byte array. Typically used at + * decoding time. + * + * <p><b> For better performance, it uses the input array provided (not a copy). + * Therefore the input array should not be modified.</b> + */ + public OrderedCode(byte[] encodedByteArray) { + encodedArrays.add(encodedByteArray); + } + + /** + * Adds the given byte array item to the OrderedCode. It encodes the input + * byte array, followed by a separator and appends the result to its + * internal encoded byte array store. + * + * <p>It works with the input array, + * so the input array 'value' should not be modified till the method returns. + * + * @param value bytes to be written. + * @see #readBytes() + */ + public void writeBytes(byte[] value) { + writeBytes(value, false); + } + + public void writeBytesDecreasing(byte[] value) { + writeBytes(value, true); + } + + private void writeBytes(byte[] value, boolean invert) { + // Determine the length of the encoded array + int encodedLength = 2; // for separator + for (byte b : value) { + if ((b == ESCAPE1) || (b == ESCAPE2)) { + encodedLength += 2; + } else { + encodedLength++; + } + } + + byte[] encodedArray = new byte[encodedLength]; + int copyStart = 0; + int outIndex = 0; + for (int i = 0; i < value.length; i++) { + byte b = value[i]; + if (b == ESCAPE1) { + arraycopy(invert, value, copyStart, encodedArray, outIndex, + i - copyStart); + outIndex += i - copyStart; + encodedArray[outIndex++] = convert(invert, ESCAPE1); + encodedArray[outIndex++] = convert(invert, NULL_CHARACTER); + copyStart = i + 1; + } else if (b == ESCAPE2) { + arraycopy(invert, value, copyStart, encodedArray, outIndex, + i - copyStart); + outIndex += i - copyStart; + encodedArray[outIndex++] = convert(invert, ESCAPE2); + encodedArray[outIndex++] = convert(invert, FF_CHARACTER); + copyStart = i + 1; + } + } + if (copyStart < value.length) { + arraycopy(invert, value, copyStart, encodedArray, outIndex, + value.length - copyStart); + outIndex += value.length - copyStart; + } + encodedArray[outIndex++] = convert(invert, ESCAPE1); + encodedArray[outIndex] = convert(invert, SEPARATOR); + + encodedArrays.add(encodedArray); + } + + private static byte convert(boolean invert, byte val) { + return invert ? (byte) ~val : val; + } + + private static byte invert(byte val) { + return convert(true, val); + } + + private void arraycopy( + boolean invert, byte[] src, int srcPos, byte[] dest, int destPos, int length) { + System.arraycopy(src, srcPos, dest, destPos, length); + if (invert) { + for (int i = destPos; i < destPos + length; i++) { + dest[i] = (byte) ~dest[i]; + } + } + } + + /** + * Encodes the long item, in big-endian format, and appends the result to its + * internal encoded byte array store. + * + * @see #readNumIncreasing() + */ + public void writeNumIncreasing(long value) { + // Values are encoded with a single byte length prefix, followed + // by the actual value in big-endian format with leading 0 bytes + // dropped. + byte[] bufer = new byte[9]; // 8 bytes for value plus one byte for length + int len = 0; + while (value != 0) { + len++; + bufer[9 - len] = (byte) (value & 0xff); + value >>>= 8; + } + bufer[9 - len - 1] = (byte) len; + len++; + byte[] encodedArray = new byte[len]; + System.arraycopy(bufer, 9 - len, encodedArray, 0, len); + encodedArrays.add(encodedArray); + } + + public void writeNumIncreasing(UnsignedInteger unsignedInt) { + writeNumIncreasing(unsignedInt.longValue()); + } + + /** + * Encodes the long item, in big-endian format, and appends the result to its + * internal encoded byte array store. + * + * @see #readNumIncreasing() + */ + public void writeNumDecreasing(long value) { + // Values are encoded with a complemented single byte length prefix, + // followed by the complement of the actual value in big-endian format with + // leading 0xff bytes dropped. + byte[] bufer = new byte[9]; // 8 bytes for value plus one byte for length + int len = 0; + while (value != 0) { + len++; + bufer[9 - len] = (byte) ~(value & 0xff); + value >>>= 8; + } + bufer[9 - len - 1] = (byte) ~len; + len++; + byte[] encodedArray = new byte[len]; + System.arraycopy(bufer, 9 - len, encodedArray, 0, len); + encodedArrays.add(encodedArray); + } + + public void writeNumDecreasing(UnsignedInteger unsignedInt) { + writeNumDecreasing(unsignedInt.longValue()); + } + + /** + * Return floor(log2(n)) for positive integer n. Returns -1 iff n == 0. + * + */ + @VisibleForTesting + int log2Floor(long n) { + checkArgument(n >= 0); + return n == 0 ? -1 : LongMath.log2(n, RoundingMode.FLOOR); + } + + /** + * Calculates the encoding length in bytes of the signed number n. + */ + @VisibleForTesting + int getSignedEncodingLength(long n) { + return BITS_TO_LENGTH[log2Floor(n < 0 ? ~n : n) + 1]; + } + + /** + * @see #readSignedNumIncreasing() + */ + public void writeSignedNumIncreasing(long val) { + long x = val < 0 ? ~val : val; + if (x < 64) { // Fast path for encoding length == 1. + byte[] encodedArray = new byte[] { (byte) (LENGTH_TO_HEADER_BITS[1][0] ^ val) }; + encodedArrays.add(encodedArray); + return; + } + // buf = val in network byte order, sign extended to 10 bytes. + byte signByte = val < 0 ? (byte) 0xff : 0; + byte[] buf = new byte[2 + Longs.BYTES]; + buf[0] = buf[1] = signByte; + System.arraycopy(Longs.toByteArray(val), 0, buf, 2, Longs.BYTES); + int len = getSignedEncodingLength(x); + if (len < 2) { + throw new IllegalStateException( + "Invalid length (" + len + ")" + " returned by getSignedEncodingLength(" + x + ")"); + } + int beginIndex = buf.length - len; + buf[beginIndex] ^= LENGTH_TO_HEADER_BITS[len][0]; + buf[beginIndex + 1] ^= LENGTH_TO_HEADER_BITS[len][1]; + + byte[] encodedArray = new byte[len]; + System.arraycopy(buf, beginIndex, encodedArray, 0, len); + encodedArrays.add(encodedArray); + } + + public void writeSignedNumDecreasing(long val) { + writeSignedNumIncreasing(~val); + } + + /** + * Encodes and appends INFINITY item to its internal encoded byte array + * store. + * + * @see #readInfinity() + */ + public void writeInfinity() { + writeTrailingBytes(INFINITY_ENCODED); + } + + /** + * Encodes and appends INFINITY item which would come before any real string. + * + * @see #readInfinityDecreasing() + */ + public void writeInfinityDecreasing() { + writeTrailingBytes(INFINITY_ENCODED_DECREASING); + } + + /** + * Appends the byte array item to its internal encoded byte array + * store. This is used for the last item and is not encoded. + * + * <p>It stores the input array in the store, + * so the input array 'value' should not be modified. + * + * @param value bytes to be written. + * @see #readTrailingBytes() + */ + public void writeTrailingBytes(byte[] value) { + if ((value == null) || (value.length == 0)) { + throw new IllegalArgumentException( + "Value cannot be null or have 0 elements"); + } + + encodedArrays.add(value); + } + + /** + * Returns the next byte array item from its encoded byte array store and + * removes the item from the store. + * + * @see #writeBytes(byte[]) + */ + public byte[] readBytes() { + return readBytes(false); + } + + public byte[] readBytesDecreasing() { + return readBytes(true); + } + + private byte[] readBytes(boolean invert) { + if ((encodedArrays == null) || (encodedArrays.size() == 0) || ( + (encodedArrays.get(0)).length - firstArrayPosition <= 0)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + // Determine the length of the decoded array + // We only scan up to "length-2" since a valid string must end with + // a two character terminator: 'ESCAPE1 SEPARATOR' + byte[] store = encodedArrays.get(0); + int decodedLength = 0; + boolean valid = false; + int i = firstArrayPosition; + while (i < store.length - 1) { + byte b = store[i++]; + if (b == convert(invert, ESCAPE1)) { + b = store[i++]; + if (b == convert(invert, SEPARATOR)) { + valid = true; + break; + } else if (b == convert(invert, NULL_CHARACTER)) { + decodedLength++; + } else { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + } else if (b == convert(invert, ESCAPE2)) { + b = store[i++]; + if (b == convert(invert, FF_CHARACTER)) { + decodedLength++; + } else { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + } else { + decodedLength++; + } + } + if (!valid) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + byte[] decodedArray = new byte[decodedLength]; + int copyStart = firstArrayPosition; + int outIndex = 0; + int j = firstArrayPosition; + while (j < store.length - 1) { + byte b = store[j++]; // note that j has been incremented + if (b == convert(invert, ESCAPE1)) { + arraycopy(invert, store, copyStart, decodedArray, outIndex, j - copyStart - 1); + outIndex += j - copyStart - 1; + // ESCAPE1 SEPARATOR ends component + // ESCAPE1 NULL_CHARACTER represents '\0' + b = store[j++]; + if (b == convert(invert, SEPARATOR)) { + if ((store.length - j) == 0) { + // we are done with the first array + encodedArrays.remove(0); + firstArrayPosition = 0; + } else { + firstArrayPosition = j; + } + return decodedArray; + } else if (b == convert(invert, NULL_CHARACTER)) { + decodedArray[outIndex++] = 0x00; + } // else not required - handled during length determination + copyStart = j; + } else if (b == convert(invert, ESCAPE2)) { + arraycopy(invert, store, copyStart, decodedArray, outIndex, j - copyStart - 1); + outIndex += j - copyStart - 1; + // ESCAPE2 FF_CHARACTER represents '\xff' + // ESCAPE2 INFINITY is an error + b = store[j++]; + if (b == convert(invert, FF_CHARACTER)) { + decodedArray[outIndex++] = (byte) 0xff; + } // else not required - handled during length determination + copyStart = j; + } + } + // not required due to the first phase, but need to entertain the compiler + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + /** + * Returns the next long item (encoded in big-endian format via + * {@code writeNumIncreasing(long)}) from its internal encoded byte array + * store and removes the item from the store. + * + * @see #writeNumIncreasing(long) + */ + public long readNumIncreasing() { + if ((encodedArrays == null) || (encodedArrays.size() == 0) || ( + (encodedArrays.get(0)).length - firstArrayPosition < 1)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + byte[] store = encodedArrays.get(0); + // Decode length byte + int len = store[firstArrayPosition]; + if ((firstArrayPosition + len + 1 > store.length) || len > 8) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + long result = 0; + for (int i = 0; i < len; i++) { + result <<= 8; + result |= (store[firstArrayPosition + i + 1] & 0xff); + } + + if ((store.length - firstArrayPosition - len - 1) == 0) { + // we are done with the first array + encodedArrays.remove(0); + firstArrayPosition = 0; + } else { + firstArrayPosition = firstArrayPosition + len + 1; + } + + return result; + } + + /** + * Returns the next long item (encoded in big-endian format via + * {@code writeNumDecreasing(long)}) from its internal encoded byte array + * store and removes the item from the store. + * + * @see #writeNumDecreasing(long) + */ + public long readNumDecreasing() { + if ((encodedArrays == null) || (encodedArrays.size() == 0) + || ((encodedArrays.get(0)).length - firstArrayPosition < 1)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + byte[] store = encodedArrays.get(0); + // Decode length byte + int len = ~store[firstArrayPosition] & 0xff; + if ((firstArrayPosition + len + 1 > store.length) || len > 8) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + long result = 0; + for (int i = 0; i < len; i++) { + result <<= 8; + result |= (~store[firstArrayPosition + i + 1] & 0xff); + } + + if ((store.length - firstArrayPosition - len - 1) == 0) { + // we are done with the first array + encodedArrays.remove(0); + firstArrayPosition = 0; + } else { + firstArrayPosition = firstArrayPosition + len + 1; + } + + return result; + } + + /** + * Returns the next long item (encoded via + * {@code writeSignedNumIncreasing(long)}) from its internal encoded byte + * array store and removes the item from the store. + * + * @see #writeSignedNumIncreasing(long) + */ + public long readSignedNumIncreasing() { + if ((encodedArrays == null) || (encodedArrays.size() == 0) || ( + (encodedArrays.get(0)).length - firstArrayPosition < 1)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + byte[] store = encodedArrays.get(0); + + long xorMask = ((store[firstArrayPosition] & 0x80) == 0) ? ~0L : 0L; + // Store first byte as an int rather than a (signed) byte -- to avoid + // accidental byte-to-int promotion later which would extend the byte's + // sign bit (if any). + int firstByte = (store[firstArrayPosition] & 0xff) ^ (int) (xorMask & 0xff); + + // Now calculate and test length, and set x to raw (unmasked) result. + int len; + long x; + if (firstByte != 0xff) { + len = 7 - log2Floor(firstByte ^ 0xff); + if (store.length - firstArrayPosition < len) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + x = xorMask; // Sign extend using xorMask. + for (int i = firstArrayPosition; i < firstArrayPosition + len; i++) { + x = (x << 8) | (store[i] & 0xff); + } + } else { + len = 8; + if (store.length - firstArrayPosition < len) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + int secondByte = (store[firstArrayPosition + 1] & 0xff) ^ (int) (xorMask & 0xff); + if (secondByte >= 0x80) { + if (secondByte < 0xc0) { + len = 9; + } else { + int thirdByte = (store[firstArrayPosition + 2] & 0xff) ^ (int) (xorMask & 0xff); + if (secondByte == 0xc0 && thirdByte < 0x80) { + len = 10; + } else { + // Either len > 10 or len == 10 and #bits > 63. + throw new IllegalArgumentException("Invalid encoded byte array"); + } + } + if (store.length - firstArrayPosition < len) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + } + x = Longs.fromByteArray( + Arrays.copyOfRange(store, firstArrayPosition + len - 8, firstArrayPosition + len)); + } + + x ^= LENGTH_TO_MASK[len]; // Remove spurious header bits. + + if (len != getSignedEncodingLength(x)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + if ((store.length - firstArrayPosition - len) == 0) { + // We are done with the first array. + encodedArrays.remove(0); + firstArrayPosition = 0; + } else { + firstArrayPosition = firstArrayPosition + len; + } + + return x; + } + + public long readSignedNumDecreasing() { + return ~readSignedNumIncreasing(); + } + + /** + * Removes INFINITY item from its internal encoded byte array store + * if present. Returns whether INFINITY was present. + * + * @see #writeInfinity() + */ + public boolean readInfinity() { + return readInfinityInternal(INFINITY_ENCODED); + } + + /** + * Removes INFINITY item from its internal encoded byte array store if present. Returns whether + * INFINITY was present. + * + * @see #writeInfinityDecreasing() + */ + public boolean readInfinityDecreasing() { + return readInfinityInternal(INFINITY_ENCODED_DECREASING); + } + + private boolean readInfinityInternal(byte[] codes) { + if ((encodedArrays == null) || (encodedArrays.size() == 0) + || ((encodedArrays.get(0)).length - firstArrayPosition < 1)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + byte[] store = encodedArrays.get(0); + if (store.length - firstArrayPosition < 2) { + return false; + } + if ((store[firstArrayPosition] == codes[0]) && (store[firstArrayPosition + 1] == codes[1])) { + if ((store.length - firstArrayPosition - 2) == 0) { + // we are done with the first array + encodedArrays.remove(0); + firstArrayPosition = 0; + } else { + firstArrayPosition = firstArrayPosition + 2; + } + return true; + } else { + return false; + } + } + + /** + * Returns the trailing byte array item from its internal encoded byte array + * store and removes the item from the store. + * + * @see #writeTrailingBytes(byte[]) + */ + public byte[] readTrailingBytes() { + // one item is contained within one byte array + if ((encodedArrays == null) || (encodedArrays.size() != 1)) { + throw new IllegalArgumentException("Invalid encoded byte array"); + } + + byte[] store = encodedArrays.get(0); + encodedArrays.remove(0); + assert encodedArrays.size() == 0; + return Arrays.copyOfRange(store, firstArrayPosition, store.length); + } + + /** + * Returns the encoded bytes that represents the current state of the + * OrderedCode. + * + * <p><b> NOTE: This method returns OrederedCode's internal array (not a + * copy) for better performance. Therefore the returned array should not be + * modified.</b> + */ + public byte[] getEncodedBytes() { + if (encodedArrays.size() == 0) { + return new byte[0]; + } + if ((encodedArrays.size() == 1) && (firstArrayPosition == 0)) { + return encodedArrays.get(0); + } + + int totalLength = 0; + + for (int i = 0; i < encodedArrays.size(); i++) { + byte[] bytes = encodedArrays.get(i); + if (i == 0) { + totalLength += bytes.length - firstArrayPosition; + } else { + totalLength += bytes.length; + } + } + + byte[] encodedBytes = new byte[totalLength]; + int destPos = 0; + for (int i = 0; i < encodedArrays.size(); i++) { + byte[] bytes = encodedArrays.get(i); + if (i == 0) { + System.arraycopy(bytes, firstArrayPosition, encodedBytes, destPos, + bytes.length - firstArrayPosition); + destPos += bytes.length - firstArrayPosition; + } else { + System.arraycopy(bytes, 0, encodedBytes, destPos, bytes.length); + destPos += bytes.length; + } + } + + // replace the store with merged array, so that repeated calls + // don't need to merge. The reads can handle both the versions. + encodedArrays.clear(); + encodedArrays.add(encodedBytes); + firstArrayPosition = 0; + + return encodedBytes; + } + + /** + * Returns true if it has more encoded bytes that haven't been read, + * false otherwise. Return value of true doesn't imply anything about + * validity of remaining data. + * @return true if it has more encoded bytes that haven't been read, + * false otherwise. + */ + public boolean hasRemainingEncodedBytes() { + // We delete an array after fully consuming it. + return encodedArrays != null && encodedArrays.size() != 0; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/57bde534/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java new file mode 100644 index 0000000..5be4826 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/OrderedCodeTest.java @@ -0,0 +1,890 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.auto.value.AutoValue; +import com.google.common.collect.ImmutableList; +import com.google.common.io.BaseEncoding; +import com.google.common.primitives.Bytes; +import com.google.common.primitives.UnsignedBytes; +import com.google.common.primitives.UnsignedInteger; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * A set of unit tests to verify {@link OrderedCode}. + */ +@RunWith(JUnit4.class) +public class OrderedCodeTest { + /** Data for a generic coding test case with known encoded outputs. */ + abstract static class CodingTestCase<T> { + /** The test value. */ + abstract T value(); + + /** + * Test value's encoding in increasing order (obtained from the C++ + * implementation). + */ + abstract String increasingBytes(); + + /** + * Test value's encoding in dencreasing order (obtained from the C++ + * implementation). + */ + abstract String decreasingBytes(); + + // Helper methods to implement in concrete classes. + + abstract byte[] encodeIncreasing(); + abstract byte[] encodeDecreasing(); + + T decodeIncreasing() { + return decodeIncreasing( + new OrderedCode(bytesFromHexString(increasingBytes()))); + } + + T decodeDecreasing() { + return decodeDecreasing( + new OrderedCode(bytesFromHexString(decreasingBytes()))); + } + + abstract T decodeIncreasing(OrderedCode orderedCode); + abstract T decodeDecreasing(OrderedCode orderedCode); + } + + @AutoValue + abstract static class UnsignedNumber extends CodingTestCase<Long> { + @Override + byte[] encodeIncreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumIncreasing(value()); + return orderedCode.getEncodedBytes(); + } + + @Override + byte[] encodeDecreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumDecreasing(value()); + return orderedCode.getEncodedBytes(); + } + + @Override + Long decodeIncreasing(OrderedCode orderedCode) { + return orderedCode.readNumIncreasing(); + } + + @Override + Long decodeDecreasing(OrderedCode orderedCode) { + return orderedCode.readNumDecreasing(); + } + + private static UnsignedNumber testCase( + long value, String increasingBytes, String decreasingBytes) { + return new AutoValue_OrderedCodeTest_UnsignedNumber( + value, increasingBytes, decreasingBytes); + } + + /** Test cases for unsigned numbers, in increasing (unsigned) order by value. */ + private static final ImmutableList<UnsignedNumber> TEST_CASES = + ImmutableList.of( + testCase(0, "00", "ff"), + testCase(1, "0101", "fefe"), + testCase(33, "0121", "fede"), + testCase(55000, "02d6d8", "fd2927"), + testCase(Integer.MAX_VALUE, "047fffffff", "fb80000000"), + testCase(Long.MAX_VALUE, "087fffffffffffffff", "f78000000000000000"), + testCase(Long.MIN_VALUE, "088000000000000000", "f77fffffffffffffff"), + testCase(-100, "08ffffffffffffff9c", "f70000000000000063"), + testCase(-1, "08ffffffffffffffff", "f70000000000000000")); + } + + @AutoValue + abstract static class BytesTest extends CodingTestCase<String> { + @Override + byte[] encodeIncreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeBytes(bytesFromHexString(value())); + return orderedCode.getEncodedBytes(); + } + + @Override + byte[] encodeDecreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeBytesDecreasing(bytesFromHexString(value())); + return orderedCode.getEncodedBytes(); + } + + @Override + String decodeIncreasing(OrderedCode orderedCode) { + return bytesToHexString(orderedCode.readBytes()); + } + + @Override + String decodeDecreasing(OrderedCode orderedCode) { + return bytesToHexString(orderedCode.readBytesDecreasing()); + } + + private static BytesTest testCase( + String value, String increasingBytes, String decreasingBytes) { + return new AutoValue_OrderedCodeTest_BytesTest( + value, increasingBytes, decreasingBytes); + } + + /** Test cases for byte arrays, in increasing order by value. */ + private static final ImmutableList<BytesTest> TEST_CASES = + ImmutableList.of( + testCase("", "0001", "fffe"), + testCase("00", "00ff0001", "ff00fffe"), + testCase("0000", "00ff00ff0001", "ff00ff00fffe"), + testCase("0001", "00ff010001", "ff00fefffe"), + testCase("0041", "00ff410001", "ff00befffe"), + testCase("00ff", "00ffff000001", "ff0000fffffe"), + testCase("01", "010001", "fefffe"), + testCase("0100", "0100ff0001", "feff00fffe"), + testCase("6f776c", "6f776c0001", "908893fffe"), + testCase("ff", "ff000001", "00fffffe"), + testCase("ff00", "ff0000ff0001", "00ffff00fffe"), + testCase("ff01", "ff00010001", "00fffefffe"), + testCase("ffff", "ff00ff000001", "00ff00fffffe"), + testCase("ffffff", "ff00ff00ff000001", "00ff00ff00fffffe")); + } + + @Test + public void testUnsignedEncoding() { + testEncoding(UnsignedNumber.TEST_CASES); + } + + @Test + public void testUnsignedDecoding() { + testDecoding(UnsignedNumber.TEST_CASES); + } + + @Test + public void testUnsignedOrdering() { + testOrdering(UnsignedNumber.TEST_CASES); + } + + @Test + public void testBytesEncoding() { + testEncoding(BytesTest.TEST_CASES); + } + + @Test + public void testBytesDecoding() { + testDecoding(BytesTest.TEST_CASES); + } + + @Test + public void testBytesOrdering() { + testOrdering(BytesTest.TEST_CASES); + } + + private void testEncoding(List<? extends CodingTestCase<?>> testCases) { + for (CodingTestCase<?> testCase : testCases) { + byte[] actualIncreasing = testCase.encodeIncreasing(); + byte[] expectedIncreasing = + bytesFromHexString(testCase.increasingBytes()); + assertEquals(0, compare(actualIncreasing, expectedIncreasing)); + + byte[] actualDecreasing = testCase.encodeDecreasing(); + byte[] expectedDecreasing = + bytesFromHexString(testCase.decreasingBytes()); + assertEquals(0, compare(actualDecreasing, expectedDecreasing)); + } + } + + private void testDecoding(List<? extends CodingTestCase<?>> testCases) { + for (CodingTestCase<?> testCase : testCases) { + assertEquals(testCase.value(), testCase.decodeIncreasing()); + assertEquals(testCase.value(), testCase.decodeDecreasing()); + } + } + + private void testOrdering(List<? extends CodingTestCase<?>> testCases) { + // This is verifiable by inspection of the C++ encodings, but it seems + // worth checking explicitly + for (int caseIndex = 0; caseIndex < testCases.size() - 1; caseIndex++) { + byte[] encodedValue = testCases.get(caseIndex).encodeIncreasing(); + byte[] nextEncodedValue = testCases.get(caseIndex + 1).encodeIncreasing(); + assertTrue(compare(encodedValue, nextEncodedValue) < 0); + + encodedValue = testCases.get(caseIndex).encodeDecreasing(); + nextEncodedValue = testCases.get(caseIndex + 1).encodeDecreasing(); + assertTrue(compare(encodedValue, nextEncodedValue) > 0); + } + } + + @Test + public void testWriteInfinity() { + OrderedCode orderedCode = new OrderedCode(); + try { + orderedCode.readInfinity(); + fail("Expected IllegalArgumentException."); + } catch (IllegalArgumentException e) { + // expected + } + orderedCode.writeInfinity(); + assertTrue(orderedCode.readInfinity()); + try { + orderedCode.readInfinity(); + fail("Expected IllegalArgumentException."); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void testWriteInfinityDecreasing() { + OrderedCode orderedCode = new OrderedCode(); + try { + orderedCode.readInfinityDecreasing(); + fail("Expected IllegalArgumentException."); + } catch (IllegalArgumentException e) { + // expected + } + orderedCode.writeInfinityDecreasing(); + assertTrue(orderedCode.readInfinityDecreasing()); + try { + orderedCode.readInfinityDecreasing(); + fail("Expected IllegalArgumentException."); + } catch (IllegalArgumentException e) { + // expected + } + } + + @Test + public void testWriteBytes() { + byte[] first = { 'a', 'b', 'c'}; + byte[] second = { 'd', 'e', 'f'}; + byte[] last = { 'x', 'y', 'z'}; + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeBytes(first); + byte[] firstEncoded = orderedCode.getEncodedBytes(); + assertTrue(Arrays.equals(orderedCode.readBytes(), first)); + + orderedCode.writeBytes(first); + orderedCode.writeBytes(second); + orderedCode.writeBytes(last); + byte[] allEncoded = orderedCode.getEncodedBytes(); + assertTrue(Arrays.equals(orderedCode.readBytes(), first)); + assertTrue(Arrays.equals(orderedCode.readBytes(), second)); + assertTrue(Arrays.equals(orderedCode.readBytes(), last)); + + orderedCode = new OrderedCode(firstEncoded); + orderedCode.writeBytes(second); + orderedCode.writeBytes(last); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), allEncoded)); + assertTrue(Arrays.equals(orderedCode.readBytes(), first)); + assertTrue(Arrays.equals(orderedCode.readBytes(), second)); + assertTrue(Arrays.equals(orderedCode.readBytes(), last)); + + orderedCode = new OrderedCode(allEncoded); + assertTrue(Arrays.equals(orderedCode.readBytes(), first)); + assertTrue(Arrays.equals(orderedCode.readBytes(), second)); + assertTrue(Arrays.equals(orderedCode.readBytes(), last)); + } + + @Test + public void testWriteBytesDecreasing() { + byte[] first = { 'a', 'b', 'c'}; + byte[] second = { 'd', 'e', 'f'}; + byte[] last = { 'x', 'y', 'z'}; + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeBytesDecreasing(first); + byte[] firstEncoded = orderedCode.getEncodedBytes(); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first)); + + orderedCode.writeBytesDecreasing(first); + orderedCode.writeBytesDecreasing(second); + orderedCode.writeBytesDecreasing(last); + byte[] allEncoded = orderedCode.getEncodedBytes(); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), second)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), last)); + + orderedCode = new OrderedCode(firstEncoded); + orderedCode.writeBytesDecreasing(second); + orderedCode.writeBytesDecreasing(last); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), allEncoded)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), second)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), last)); + + orderedCode = new OrderedCode(allEncoded); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), first)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), second)); + assertTrue(Arrays.equals(orderedCode.readBytesDecreasing(), last)); + } + + @Test + public void testWriteNumIncreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumIncreasing(0); + orderedCode.writeNumIncreasing(1); + orderedCode.writeNumIncreasing(Long.MIN_VALUE); + orderedCode.writeNumIncreasing(Long.MAX_VALUE); + assertEquals(0, orderedCode.readNumIncreasing()); + assertEquals(1, orderedCode.readNumIncreasing()); + assertEquals(Long.MIN_VALUE, orderedCode.readNumIncreasing()); + assertEquals(Long.MAX_VALUE, orderedCode.readNumIncreasing()); + } + + @Test + public void testWriteNumIncreasing_unsignedInt() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(0)); + orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(1)); + orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(Integer.MIN_VALUE)); + orderedCode.writeNumIncreasing(UnsignedInteger.fromIntBits(Integer.MAX_VALUE)); + assertEquals(0, orderedCode.readNumIncreasing()); + assertEquals(1, orderedCode.readNumIncreasing()); + assertEquals(Long.valueOf(Integer.MAX_VALUE) + 1L, orderedCode.readNumIncreasing()); + assertEquals(Integer.MAX_VALUE, orderedCode.readNumIncreasing()); + } + + @Test + public void testWriteNumDecreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumDecreasing(0); + orderedCode.writeNumDecreasing(1); + orderedCode.writeNumDecreasing(Long.MIN_VALUE); + orderedCode.writeNumDecreasing(Long.MAX_VALUE); + assertEquals(0, orderedCode.readNumDecreasing()); + assertEquals(1, orderedCode.readNumDecreasing()); + assertEquals(Long.MIN_VALUE, orderedCode.readNumDecreasing()); + assertEquals(Long.MAX_VALUE, orderedCode.readNumDecreasing()); + } + + @Test + public void testWriteNumDecreasing_unsignedInt() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(0)); + orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(1)); + orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(Integer.MIN_VALUE)); + orderedCode.writeNumDecreasing(UnsignedInteger.fromIntBits(Integer.MAX_VALUE)); + assertEquals(0, orderedCode.readNumDecreasing()); + assertEquals(1, orderedCode.readNumDecreasing()); + assertEquals(Long.valueOf(Integer.MAX_VALUE) + 1L, orderedCode.readNumDecreasing()); + assertEquals(Integer.MAX_VALUE, orderedCode.readNumDecreasing()); + } + + /** + * Assert that encoding the specified long via + * {@link OrderedCode#writeSignedNumIncreasing(long)} results in the bytes + * represented by the specified string of hex digits. + * E.g. assertSignedNumIncreasingEncodingEquals("3fbf", -65) asserts that + * -65 is encoded as { (byte) 0x3f, (byte) 0xbf }. + */ + private static void assertSignedNumIncreasingEncodingEquals( + String expectedHexEncoding, long num) { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeSignedNumIncreasing(num); + assertEquals( + "Unexpected encoding for " + num, + expectedHexEncoding, + bytesToHexString(orderedCode.getEncodedBytes())); + } + + /** + * Assert that encoding various long values via + * {@link OrderedCode#writeSignedNumIncreasing(long)} produces the expected + * bytes. Expected byte sequences were generated via the c++ (authoritative) + * implementation of OrderedCode::WriteSignedNumIncreasing. + */ + @Test + public void testSignedNumIncreasing_write() { + assertSignedNumIncreasingEncodingEquals( + "003f8000000000000000", Long.MIN_VALUE); + assertSignedNumIncreasingEncodingEquals( + "003f8000000000000001", Long.MIN_VALUE + 1); + assertSignedNumIncreasingEncodingEquals( + "077fffffff", Integer.MIN_VALUE - 1L); + assertSignedNumIncreasingEncodingEquals("0780000000", Integer.MIN_VALUE); + assertSignedNumIncreasingEncodingEquals( + "0780000001", Integer.MIN_VALUE + 1); + assertSignedNumIncreasingEncodingEquals("3fbf", -65); + assertSignedNumIncreasingEncodingEquals("40", -64); + assertSignedNumIncreasingEncodingEquals("41", -63); + assertSignedNumIncreasingEncodingEquals("7d", -3); + assertSignedNumIncreasingEncodingEquals("7e", -2); + assertSignedNumIncreasingEncodingEquals("7f", -1); + assertSignedNumIncreasingEncodingEquals("80", 0); + assertSignedNumIncreasingEncodingEquals("81", 1); + assertSignedNumIncreasingEncodingEquals("82", 2); + assertSignedNumIncreasingEncodingEquals("83", 3); + assertSignedNumIncreasingEncodingEquals("bf", 63); + assertSignedNumIncreasingEncodingEquals("c040", 64); + assertSignedNumIncreasingEncodingEquals("c041", 65); + assertSignedNumIncreasingEncodingEquals( + "f87ffffffe", Integer.MAX_VALUE - 1); + assertSignedNumIncreasingEncodingEquals("f87fffffff", Integer.MAX_VALUE); + assertSignedNumIncreasingEncodingEquals( + "f880000000", Integer.MAX_VALUE + 1L); + assertSignedNumIncreasingEncodingEquals( + "ffc07ffffffffffffffe", Long.MAX_VALUE - 1); + assertSignedNumIncreasingEncodingEquals( + "ffc07fffffffffffffff", Long.MAX_VALUE); + } + + /** + * Convert a string of hex digits (e.g. "3fbf") to a byte[] + * (e.g. { (byte) 0x3f, (byte) 0xbf }). + */ + private static byte[] bytesFromHexString(String hexDigits) { + return BaseEncoding.base16().lowerCase().decode(hexDigits); + } + + /** + * Convert a byte[] (e.g. { (byte) 0x3f, (byte) 0xbf }) to a string of hex + * digits (e.g. "3fbf"). + */ + private static String bytesToHexString(byte[] bytes) { + return BaseEncoding.base16().lowerCase().encode(bytes); + } + + /** + * Assert that decoding (via {@link OrderedCode#readSignedNumIncreasing()}) + * the bytes represented by the specified string of hex digits results in the + * expected long value. + * E.g. assertDecodedSignedNumIncreasingEquals(-65, "3fbf") asserts that the + * byte array { (byte) 0x3f, (byte) 0xbf } is decoded as -65. + */ + private static void assertDecodedSignedNumIncreasingEquals( + long expectedNum, String encodedHexString) { + OrderedCode orderedCode = + new OrderedCode(bytesFromHexString(encodedHexString)); + assertEquals( + "Unexpected value when decoding 0x" + encodedHexString, + expectedNum, + orderedCode.readSignedNumIncreasing()); + assertFalse( + "Unexpected encoded bytes remain after decoding 0x" + encodedHexString, + orderedCode.hasRemainingEncodedBytes()); + } + + /** + * Assert that decoding various sequences of bytes via + * {@link OrderedCode#readSignedNumIncreasing()} produces the expected long + * value. + * Input byte sequences were generated via the c++ (authoritative) + * implementation of OrderedCode::WriteSignedNumIncreasing. + */ + @Test + public void testSignedNumIncreasing_read() { + assertDecodedSignedNumIncreasingEquals( + Long.MIN_VALUE, "003f8000000000000000"); + assertDecodedSignedNumIncreasingEquals( + Long.MIN_VALUE + 1, "003f8000000000000001"); + assertDecodedSignedNumIncreasingEquals( + Integer.MIN_VALUE - 1L, "077fffffff"); + assertDecodedSignedNumIncreasingEquals(Integer.MIN_VALUE, "0780000000"); + assertDecodedSignedNumIncreasingEquals(Integer.MIN_VALUE + 1, "0780000001"); + assertDecodedSignedNumIncreasingEquals(-65, "3fbf"); + assertDecodedSignedNumIncreasingEquals(-64, "40"); + assertDecodedSignedNumIncreasingEquals(-63, "41"); + assertDecodedSignedNumIncreasingEquals(-3, "7d"); + assertDecodedSignedNumIncreasingEquals(-2, "7e"); + assertDecodedSignedNumIncreasingEquals(-1, "7f"); + assertDecodedSignedNumIncreasingEquals(0, "80"); + assertDecodedSignedNumIncreasingEquals(1, "81"); + assertDecodedSignedNumIncreasingEquals(2, "82"); + assertDecodedSignedNumIncreasingEquals(3, "83"); + assertDecodedSignedNumIncreasingEquals(63, "bf"); + assertDecodedSignedNumIncreasingEquals(64, "c040"); + assertDecodedSignedNumIncreasingEquals(65, "c041"); + assertDecodedSignedNumIncreasingEquals(Integer.MAX_VALUE - 1, "f87ffffffe"); + assertDecodedSignedNumIncreasingEquals(Integer.MAX_VALUE, "f87fffffff"); + assertDecodedSignedNumIncreasingEquals( + Integer.MAX_VALUE + 1L, "f880000000"); + assertDecodedSignedNumIncreasingEquals( + Long.MAX_VALUE - 1, "ffc07ffffffffffffffe"); + assertDecodedSignedNumIncreasingEquals( + Long.MAX_VALUE, "ffc07fffffffffffffff"); + } + + /** + * Assert that encoding (via + * {@link OrderedCode#writeSignedNumIncreasing(long)}) the specified long + * value and then decoding (via {@link OrderedCode#readSignedNumIncreasing()}) + * results in the original value. + */ + private static void assertSignedNumIncreasingWriteAndReadIsLossless( + long num) { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeSignedNumIncreasing(num); + assertEquals( + "Unexpected result when decoding writeSignedNumIncreasing(" + num + ")", + num, + orderedCode.readSignedNumIncreasing()); + assertFalse("Unexpected remaining encoded bytes after decoding " + num, + orderedCode.hasRemainingEncodedBytes()); + } + + /** + * Assert that for various long values, encoding (via + * {@link OrderedCode#writeSignedNumIncreasing(long)}) and then decoding (via + * {@link OrderedCode#readSignedNumIncreasing()}) results in the original + * value. + */ + @Test + public void testSignedNumIncreasing_writeAndRead() { + assertSignedNumIncreasingWriteAndReadIsLossless(Long.MIN_VALUE); + assertSignedNumIncreasingWriteAndReadIsLossless(Long.MIN_VALUE + 1); + assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MIN_VALUE - 1L); + assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MIN_VALUE); + assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MIN_VALUE + 1); + assertSignedNumIncreasingWriteAndReadIsLossless(-65); + assertSignedNumIncreasingWriteAndReadIsLossless(-64); + assertSignedNumIncreasingWriteAndReadIsLossless(-63); + assertSignedNumIncreasingWriteAndReadIsLossless(-3); + assertSignedNumIncreasingWriteAndReadIsLossless(-2); + assertSignedNumIncreasingWriteAndReadIsLossless(-1); + assertSignedNumIncreasingWriteAndReadIsLossless(0); + assertSignedNumIncreasingWriteAndReadIsLossless(1); + assertSignedNumIncreasingWriteAndReadIsLossless(2); + assertSignedNumIncreasingWriteAndReadIsLossless(3); + assertSignedNumIncreasingWriteAndReadIsLossless(63); + assertSignedNumIncreasingWriteAndReadIsLossless(64); + assertSignedNumIncreasingWriteAndReadIsLossless(65); + assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MAX_VALUE - 1); + assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MAX_VALUE); + assertSignedNumIncreasingWriteAndReadIsLossless(Integer.MAX_VALUE + 1L); + assertSignedNumIncreasingWriteAndReadIsLossless(Long.MAX_VALUE - 1); + assertSignedNumIncreasingWriteAndReadIsLossless(Long.MAX_VALUE); + } + + /** + * Assert that encoding (via + * {@link OrderedCode#writeSignedNumDecreasing(long)}) the specified long + * value and then decoding (via {@link OrderedCode#readSignedNumDecreasing()}) + * results in the original value. + */ + private static void assertSignedNumDecreasingWriteAndReadIsLossless( + long num) { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeSignedNumDecreasing(num); + assertEquals( + "Unexpected result when decoding writeSignedNumDecreasing(" + num + ")", + num, + orderedCode.readSignedNumDecreasing()); + assertFalse("Unexpected remaining encoded bytes after decoding " + num, + orderedCode.hasRemainingEncodedBytes()); + } + + /** + * Assert that for various long values, encoding (via + * {@link OrderedCode#writeSignedNumDecreasing(long)}) and then decoding (via + * {@link OrderedCode#readSignedNumDecreasing()}) results in the original + * value. + */ + @Test + public void testSignedNumDecreasing_writeAndRead() { + assertSignedNumDecreasingWriteAndReadIsLossless(Long.MIN_VALUE); + assertSignedNumDecreasingWriteAndReadIsLossless(Long.MIN_VALUE + 1); + assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MIN_VALUE - 1L); + assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MIN_VALUE); + assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MIN_VALUE + 1); + assertSignedNumDecreasingWriteAndReadIsLossless(-65); + assertSignedNumDecreasingWriteAndReadIsLossless(-64); + assertSignedNumDecreasingWriteAndReadIsLossless(-63); + assertSignedNumDecreasingWriteAndReadIsLossless(-3); + assertSignedNumDecreasingWriteAndReadIsLossless(-2); + assertSignedNumDecreasingWriteAndReadIsLossless(-1); + assertSignedNumDecreasingWriteAndReadIsLossless(0); + assertSignedNumDecreasingWriteAndReadIsLossless(1); + assertSignedNumDecreasingWriteAndReadIsLossless(2); + assertSignedNumDecreasingWriteAndReadIsLossless(3); + assertSignedNumDecreasingWriteAndReadIsLossless(63); + assertSignedNumDecreasingWriteAndReadIsLossless(64); + assertSignedNumDecreasingWriteAndReadIsLossless(65); + assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MAX_VALUE - 1); + assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MAX_VALUE); + assertSignedNumDecreasingWriteAndReadIsLossless(Integer.MAX_VALUE + 1L); + assertSignedNumDecreasingWriteAndReadIsLossless(Long.MAX_VALUE - 1); + assertSignedNumDecreasingWriteAndReadIsLossless(Long.MAX_VALUE); + } + + /** Ensures that numbers encoded as "decreasing" do indeed sort in reverse order. */ + @Test + public void testDecreasing() { + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeSignedNumDecreasing(10L); + byte[] ten = orderedCode.getEncodedBytes(); + orderedCode = new OrderedCode(); + orderedCode.writeSignedNumDecreasing(20L); + byte[] twenty = orderedCode.getEncodedBytes(); + // In decreasing order, twenty preceeds ten. + assertTrue(compare(twenty, ten) < 0); + } + + @Test + public void testLog2Floor_Positive() { + OrderedCode orderedCode = new OrderedCode(); + assertEquals(0, orderedCode.log2Floor(1)); + assertEquals(1, orderedCode.log2Floor(2)); + assertEquals(1, orderedCode.log2Floor(3)); + assertEquals(2, orderedCode.log2Floor(4)); + assertEquals(5, orderedCode.log2Floor(63)); + assertEquals(6, orderedCode.log2Floor(64)); + assertEquals(62, orderedCode.log2Floor(Long.MAX_VALUE)); + } + + /** + * OrderedCode.log2Floor(long) is defined to return -1 given an input of zero + * (because that's what Bits::Log2Floor64(uint64) does). + */ + @Test + public void testLog2Floor_zero() { + OrderedCode orderedCode = new OrderedCode(); + assertEquals(-1, orderedCode.log2Floor(0)); + } + + @Test + public void testLog2Floor_negative() { + OrderedCode orderedCode = new OrderedCode(); + try { + orderedCode.log2Floor(-1); + fail("Expected an IllegalArgumentException."); + } catch (IllegalArgumentException expected) { + // Expected! + } + } + + @Test + public void testGetSignedEncodingLength() { + OrderedCode orderedCode = new OrderedCode(); + assertEquals(10, orderedCode.getSignedEncodingLength(Long.MIN_VALUE)); + assertEquals(10, orderedCode.getSignedEncodingLength(~(1L << 62))); + assertEquals(9, orderedCode.getSignedEncodingLength(~(1L << 62) + 1)); + assertEquals(3, orderedCode.getSignedEncodingLength(-8193)); + assertEquals(2, orderedCode.getSignedEncodingLength(-8192)); + assertEquals(2, orderedCode.getSignedEncodingLength(-65)); + assertEquals(1, orderedCode.getSignedEncodingLength(-64)); + assertEquals(1, orderedCode.getSignedEncodingLength(-2)); + assertEquals(1, orderedCode.getSignedEncodingLength(-1)); + assertEquals(1, orderedCode.getSignedEncodingLength(0)); + assertEquals(1, orderedCode.getSignedEncodingLength(1)); + assertEquals(1, orderedCode.getSignedEncodingLength(63)); + assertEquals(2, orderedCode.getSignedEncodingLength(64)); + assertEquals(2, orderedCode.getSignedEncodingLength(8191)); + assertEquals(3, orderedCode.getSignedEncodingLength(8192)); + assertEquals(9, orderedCode.getSignedEncodingLength((1L << 62)) - 1); + assertEquals(10, orderedCode.getSignedEncodingLength(1L << 62)); + assertEquals(10, orderedCode.getSignedEncodingLength(Long.MAX_VALUE)); + } + + @Test + public void testWriteTrailingBytes() { + byte[] escapeChars = new byte[] { OrderedCode.ESCAPE1, + OrderedCode.NULL_CHARACTER, OrderedCode.SEPARATOR, OrderedCode.ESCAPE2, + OrderedCode.INFINITY, OrderedCode.FF_CHARACTER}; + byte[] anotherArray = new byte[] { 'a', 'b', 'c', 'd', 'e' }; + + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeTrailingBytes(escapeChars); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), escapeChars)); + assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), escapeChars)); + try { + orderedCode.readInfinity(); + fail("Expected IllegalArgumentException."); + } catch (IllegalArgumentException e) { + // expected + } + + orderedCode = new OrderedCode(); + orderedCode.writeTrailingBytes(anotherArray); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), anotherArray)); + assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), anotherArray)); + } + + @Test + public void testMixedWrite() { + byte[] first = { 'a', 'b', 'c'}; + byte[] second = { 'd', 'e', 'f'}; + byte[] last = { 'x', 'y', 'z'}; + byte[] escapeChars = new byte[] { OrderedCode.ESCAPE1, + OrderedCode.NULL_CHARACTER, OrderedCode.SEPARATOR, OrderedCode.ESCAPE2, + OrderedCode.INFINITY, OrderedCode.FF_CHARACTER}; + + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeBytes(first); + orderedCode.writeBytes(second); + orderedCode.writeBytes(last); + orderedCode.writeInfinity(); + orderedCode.writeNumIncreasing(0); + orderedCode.writeNumIncreasing(1); + orderedCode.writeNumIncreasing(Long.MIN_VALUE); + orderedCode.writeNumIncreasing(Long.MAX_VALUE); + orderedCode.writeSignedNumIncreasing(0); + orderedCode.writeSignedNumIncreasing(1); + orderedCode.writeSignedNumIncreasing(Long.MIN_VALUE); + orderedCode.writeSignedNumIncreasing(Long.MAX_VALUE); + orderedCode.writeTrailingBytes(escapeChars); + byte[] allEncoded = orderedCode.getEncodedBytes(); + assertTrue(Arrays.equals(orderedCode.readBytes(), first)); + assertTrue(Arrays.equals(orderedCode.readBytes(), second)); + assertFalse(orderedCode.readInfinity()); + assertTrue(Arrays.equals(orderedCode.readBytes(), last)); + assertTrue(orderedCode.readInfinity()); + assertEquals(0, orderedCode.readNumIncreasing()); + assertEquals(1, orderedCode.readNumIncreasing()); + assertFalse(orderedCode.readInfinity()); + assertEquals(Long.MIN_VALUE, orderedCode.readNumIncreasing()); + assertEquals(Long.MAX_VALUE, orderedCode.readNumIncreasing()); + assertEquals(0, orderedCode.readSignedNumIncreasing()); + assertEquals(1, orderedCode.readSignedNumIncreasing()); + assertFalse(orderedCode.readInfinity()); + assertEquals(Long.MIN_VALUE, orderedCode.readSignedNumIncreasing()); + assertEquals(Long.MAX_VALUE, orderedCode.readSignedNumIncreasing()); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), escapeChars)); + assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), escapeChars)); + + orderedCode = new OrderedCode(allEncoded); + assertTrue(Arrays.equals(orderedCode.readBytes(), first)); + assertTrue(Arrays.equals(orderedCode.readBytes(), second)); + assertFalse(orderedCode.readInfinity()); + assertTrue(Arrays.equals(orderedCode.readBytes(), last)); + assertTrue(orderedCode.readInfinity()); + assertEquals(0, orderedCode.readNumIncreasing()); + assertEquals(1, orderedCode.readNumIncreasing()); + assertFalse(orderedCode.readInfinity()); + assertEquals(Long.MIN_VALUE, orderedCode.readNumIncreasing()); + assertEquals(Long.MAX_VALUE, orderedCode.readNumIncreasing()); + assertEquals(0, orderedCode.readSignedNumIncreasing()); + assertEquals(1, orderedCode.readSignedNumIncreasing()); + assertFalse(orderedCode.readInfinity()); + assertEquals(Long.MIN_VALUE, orderedCode.readSignedNumIncreasing()); + assertEquals(Long.MAX_VALUE, orderedCode.readSignedNumIncreasing()); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), escapeChars)); + assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), escapeChars)); + } + + @Test + public void testEdgeCases() { + byte[] ffChar = {OrderedCode.FF_CHARACTER}; + byte[] nullChar = {OrderedCode.NULL_CHARACTER}; + + byte[] separatorEncoded = {OrderedCode.ESCAPE1, OrderedCode.SEPARATOR}; + byte[] ffCharEncoded = {OrderedCode.ESCAPE1, OrderedCode.NULL_CHARACTER}; + byte[] nullCharEncoded = {OrderedCode.ESCAPE2, OrderedCode.FF_CHARACTER}; + byte[] infinityEncoded = {OrderedCode.ESCAPE2, OrderedCode.INFINITY}; + + OrderedCode orderedCode = new OrderedCode(); + orderedCode.writeBytes(ffChar); + orderedCode.writeBytes(nullChar); + orderedCode.writeInfinity(); + assertTrue(Arrays.equals(orderedCode.getEncodedBytes(), + Bytes.concat(ffCharEncoded, separatorEncoded, + nullCharEncoded, separatorEncoded, + infinityEncoded))); + assertTrue(Arrays.equals(orderedCode.readBytes(), ffChar)); + assertTrue(Arrays.equals(orderedCode.readBytes(), nullChar)); + assertTrue(orderedCode.readInfinity()); + + orderedCode = new OrderedCode( + Bytes.concat(ffCharEncoded, separatorEncoded)); + assertTrue(Arrays.equals(orderedCode.readBytes(), ffChar)); + + orderedCode = new OrderedCode( + Bytes.concat(nullCharEncoded, separatorEncoded)); + assertTrue(Arrays.equals(orderedCode.readBytes(), nullChar)); + + byte[] invalidEncodingForRead = {OrderedCode.ESCAPE2, OrderedCode.ESCAPE2, + OrderedCode.ESCAPE1, OrderedCode.SEPARATOR}; + orderedCode = new OrderedCode(invalidEncodingForRead); + try { + orderedCode.readBytes(); + fail("Should have failed."); + } catch (Exception e) { + // Expected + } + assertTrue(orderedCode.hasRemainingEncodedBytes()); + } + + @Test + public void testHasRemainingEncodedBytes() { + byte[] bytes = { 'a', 'b', 'c'}; + long number = 12345; + + // Empty + OrderedCode orderedCode = new OrderedCode(); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + + // First and only field of each type. + orderedCode.writeBytes(bytes); + assertTrue(orderedCode.hasRemainingEncodedBytes()); + assertTrue(Arrays.equals(orderedCode.readBytes(), bytes)); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + + orderedCode.writeNumIncreasing(number); + assertTrue(orderedCode.hasRemainingEncodedBytes()); + assertEquals(orderedCode.readNumIncreasing(), number); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + + orderedCode.writeSignedNumIncreasing(number); + assertTrue(orderedCode.hasRemainingEncodedBytes()); + assertEquals(orderedCode.readSignedNumIncreasing(), number); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + + orderedCode.writeInfinity(); + assertTrue(orderedCode.hasRemainingEncodedBytes()); + assertTrue(orderedCode.readInfinity()); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + + orderedCode.writeTrailingBytes(bytes); + assertTrue(orderedCode.hasRemainingEncodedBytes()); + assertTrue(Arrays.equals(orderedCode.readTrailingBytes(), bytes)); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + + // Two fields of same type. + orderedCode.writeBytes(bytes); + orderedCode.writeBytes(bytes); + assertTrue(orderedCode.hasRemainingEncodedBytes()); + assertTrue(Arrays.equals(orderedCode.readBytes(), bytes)); + assertTrue(Arrays.equals(orderedCode.readBytes(), bytes)); + assertFalse(orderedCode.hasRemainingEncodedBytes()); + } + + @Test + public void testOrderingInfinity() { + OrderedCode inf = new OrderedCode(); + inf.writeInfinity(); + + OrderedCode negInf = new OrderedCode(); + negInf.writeInfinityDecreasing(); + + OrderedCode longValue = new OrderedCode(); + longValue.writeSignedNumIncreasing(1); + + assertTrue(compare(inf.getEncodedBytes(), negInf.getEncodedBytes()) > 0); + assertTrue(compare(longValue.getEncodedBytes(), negInf.getEncodedBytes()) > 0); + assertTrue(compare(inf.getEncodedBytes(), longValue.getEncodedBytes()) > 0); + } + + private int compare(byte[] bytes1, byte[] bytes2) { + return UnsignedBytes.lexicographicalComparator().compare(bytes1, bytes2); + } +}