This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new a55c4ac61 [iceberg] Introduce iceberg bucketing function (#1409)
a55c4ac61 is described below
commit a55c4ac61bdf76a75d42b13aae585306d76075ab
Author: Yang Guo <[email protected]>
AuthorDate: Thu Aug 7 19:36:31 2025 +0800
[iceberg] Introduce iceberg bucketing function (#1409)
---
.../alibaba/fluss/bucketing/BucketingFunction.java | 2 +
.../fluss/bucketing/IcebergBucketingFunction.java | 38 +++
.../row/encode/iceberg/IcebergBinaryRowWriter.java | 69 +++---
.../bucketing/IcebergBucketingFunctionTest.java | 261 +++++++++++++++++++++
.../encode/iceberg/IcebergBinaryRowWriterTest.java | 221 +++++++++++++++++
.../row/encode/iceberg/IcebergKeyEncoderTest.java | 249 +++-----------------
6 files changed, 597 insertions(+), 243 deletions(-)
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
index 8b06ae3c1..5fe3f6f57 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/BucketingFunction.java
@@ -41,6 +41,8 @@ public interface BucketingFunction {
return new PaimonBucketingFunction();
} else if (lakeFormat == DataLakeFormat.LANCE) {
return new FlussBucketingFunction();
+ } else if (lakeFormat == DataLakeFormat.ICEBERG) {
+ return new IcebergBucketingFunction();
} else {
throw new UnsupportedOperationException("Unsupported lake format:
" + lakeFormat);
}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/bucketing/IcebergBucketingFunction.java
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/IcebergBucketingFunction.java
new file mode 100644
index 000000000..0325f3480
--- /dev/null
+++
b/fluss-common/src/main/java/com/alibaba/fluss/bucketing/IcebergBucketingFunction.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.bucketing;
+
+import com.alibaba.fluss.shaded.guava32.com.google.common.hash.HashFunction;
+import com.alibaba.fluss.shaded.guava32.com.google.common.hash.Hashing;
+
+/** An implementation of {@link BucketingFunction} to follow Iceberg's
bucketing strategy. */
+public class IcebergBucketingFunction implements BucketingFunction {
+
+ private static final HashFunction MURMUR3 = Hashing.murmur3_32_fixed();
+
+ @Override
+ public int bucketing(byte[] bucketKey, int numBuckets) {
+ if (bucketKey == null || bucketKey.length == 0) {
+ throw new IllegalArgumentException("bucketKey must not be null or
empty");
+ }
+ if (numBuckets <= 0) {
+ throw new IllegalArgumentException("numBuckets must be positive");
+ }
+ return (MURMUR3.hashBytes(bucketKey).asInt() & Integer.MAX_VALUE) %
numBuckets;
+ }
+}
diff --git
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
index b18a9bfa0..4dd2ffeb9 100644
---
a/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
+++
b/fluss-common/src/main/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriter.java
@@ -27,36 +27,31 @@ import com.alibaba.fluss.utils.UnsafeUtils;
import java.io.Serializable;
import java.util.Arrays;
-import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
-
/**
* A writer to encode Fluss's {@link com.alibaba.fluss.row.InternalRow} using
Iceberg's binary
* encoding format.
*
- * <p>The encoding logic is based on Iceberg's Conversions.toByteBuffer()
implementation:
- *
https://github.com/apache/iceberg/blob/main/api/src/main/java/org/apache/iceberg/types/Conversions.java
- *
- * <p>Key encoding principles from Iceberg's Conversions class:
+ * <p>The encoding logic is to encode different types of keys into bytes
array, which will have the
+ * same hash value with iceberg's bucketing hash function.
*
* <ul>
- * <li>All numeric types (int, long, float, double, timestamps) use
LITTLE-ENDIAN byte order
+ * <li>For int type, it is treated as a long value for encoding
+ * <li>All numeric types (long, float, double, timestamps) use LITTLE-ENDIAN
byte order
* <li>Decimal types use BIG-ENDIAN byte order
* <li>Strings are encoded as UTF-8 bytes
* <li>Timestamps are stored as long values (microseconds since epoch)
* </ul>
*
* <p>Note: This implementation uses Fluss's MemorySegment instead of
ByteBuffer for performance,
- * but maintains byte-level compatibility with Iceberg's encoding.
+ * but maintains byte-level compatibility.
*/
class IcebergBinaryRowWriter {
- private final int arity;
private byte[] buffer;
private MemorySegment segment;
private int cursor;
public IcebergBinaryRowWriter(int arity) {
- this.arity = arity;
// Conservative initial size to avoid frequent resizing
int initialSize = 8 + (arity * 8);
setBuffer(new byte[initialSize]);
@@ -64,11 +59,11 @@ class IcebergBinaryRowWriter {
}
public void reset() {
- this.cursor = 0;
// Clear only the used portion for efficiency
if (cursor > 0) {
Arrays.fill(buffer, 0, Math.min(cursor, buffer.length), (byte) 0);
}
+ this.cursor = 0;
}
public byte[] toBytes() {
@@ -77,13 +72,6 @@ class IcebergBinaryRowWriter {
return result;
}
- public void setNullAt(int pos) {
- // For Iceberg key encoding, null values should not occur
- // This is validated at the encoder level
- throw new UnsupportedOperationException(
- "Null values are not supported in Iceberg key encoding");
- }
-
public void writeBoolean(boolean value) {
ensureCapacity(1);
UnsafeUtils.putBoolean(buffer, cursor, value);
@@ -108,6 +96,10 @@ class IcebergBinaryRowWriter {
cursor += 4;
}
+ public void writeIntAsLong(int value) {
+ writeLong(value);
+ }
+
public void writeLong(long value) {
ensureCapacity(8);
UnsafeUtils.putLong(buffer, cursor, value);
@@ -127,26 +119,40 @@ class IcebergBinaryRowWriter {
}
public void writeString(BinaryString value) {
+ writeString(value, false);
+ }
+
+ public void writeString(BinaryString value, boolean skipEncodeLength) {
// Convert to UTF-8 byte array
byte[] bytes = BinaryString.encodeUTF8(value.toString());
- // Write length prefix followed by UTF-8 bytes
- writeInt(bytes.length); // 4-byte length prefix
- ensureCapacity(bytes.length); // Ensure space for actual string bytes
- segment.put(cursor, bytes, 0, bytes.length);
- cursor += bytes.length;
+ writeByteArray(bytes, skipEncodeLength);
}
public void writeBytes(byte[] bytes) {
- // Write length prefix followed by binary data
- writeInt(bytes.length); // 4-byte length prefix
+ writeBytes(bytes, false);
+ }
+
+ public void writeBytes(byte[] bytes, boolean skipEncodeLength) {
+ writeByteArray(bytes, skipEncodeLength);
+ }
+
+ private void writeByteArray(byte[] bytes, boolean skipEncodeLength) {
+ if (!skipEncodeLength) {
+ // Write length prefix followed by binary data
+ writeInt(bytes.length); // 4-byte length prefix
+ }
ensureCapacity(bytes.length); // Ensure space for actual binary bytes
segment.put(cursor, bytes, 0, bytes.length);
cursor += bytes.length;
}
- public void writeDecimal(Decimal value, int precision) {
+ public void writeDecimal(Decimal value) {
+ writeDecimal(value, false);
+ }
+
+ public void writeDecimal(Decimal value, boolean skipEncodeLength) {
byte[] unscaled = value.toUnscaledBytes();
- writeBytes(unscaled); // Adds 4-byte length prefix before the actual
bytes
+ writeBytes(unscaled, skipEncodeLength); // Adds 4-byte length prefix
before the actual bytes
}
private void ensureCapacity(int neededSize) {
@@ -178,7 +184,7 @@ class IcebergBinaryRowWriter {
switch (fieldType.getTypeRoot()) {
case INTEGER:
case DATE:
- return (writer, value) -> writer.writeInt((int) value);
+ return (writer, value) -> writer.writeIntAsLong((int) value);
case TIME_WITHOUT_TIME_ZONE:
// Write time as microseconds long (milliseconds * 1000)
@@ -199,16 +205,15 @@ class IcebergBinaryRowWriter {
};
case DECIMAL:
- final int decimalPrecision = getPrecision(fieldType);
- return (writer, value) -> writer.writeDecimal((Decimal) value,
decimalPrecision);
+ return (writer, value) -> writer.writeDecimal((Decimal) value,
true);
case STRING:
case CHAR:
- return (writer, value) -> writer.writeString((BinaryString)
value);
+ return (writer, value) -> writer.writeString((BinaryString)
value, true);
case BINARY:
case BYTES:
- return (writer, value) -> writer.writeBytes((byte[]) value);
+ return (writer, value) -> writer.writeBytes((byte[]) value,
true);
default:
throw new IllegalArgumentException(
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/bucketing/IcebergBucketingFunctionTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/bucketing/IcebergBucketingFunctionTest.java
new file mode 100644
index 000000000..278136aa5
--- /dev/null
+++
b/fluss-common/src/test/java/com/alibaba/fluss/bucketing/IcebergBucketingFunctionTest.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.bucketing;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+import com.alibaba.fluss.row.GenericRow;
+import com.alibaba.fluss.row.TimestampNtz;
+import com.alibaba.fluss.row.encode.iceberg.IcebergKeyEncoder;
+import com.alibaba.fluss.types.DataType;
+import com.alibaba.fluss.types.DataTypes;
+import com.alibaba.fluss.types.RowType;
+
+import org.apache.iceberg.transforms.Transform;
+import org.apache.iceberg.transforms.Transforms;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit test for {@link IcebergBucketingFunction}. */
+class IcebergBucketingFunctionTest {
+
+ @Test
+ void testIntegerHash() throws IOException {
+ int testValue = 42;
+ int bucketNum = 10;
+
+ RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new
String[] {"id"});
+
+ GenericRow row = GenericRow.of(testValue);
+ IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(testValue);
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket =
transform.bind(Types.IntegerType.get()).apply(testValue);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testLongHash() throws IOException {
+ long testValue = 1234567890123456789L;
+ int bucketNum = 10;
+
+ RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new
String[] {"id"});
+
+ GenericRow row = GenericRow.of(testValue);
+ IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(testValue);
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket =
transform.bind(Types.LongType.get()).apply(testValue);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testStringHash() throws IOException {
+ String testValue = "Hello Iceberg, Fluss this side!";
+ int bucketNum = 10;
+
+ RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new
String[] {"name"});
+
+ GenericRow row = GenericRow.of(BinaryString.fromString(testValue));
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("name"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = testValue.getBytes(StandardCharsets.UTF_8);
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket =
transform.bind(Types.StringType.get()).apply(testValue);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testDecimalHash() throws IOException {
+ BigDecimal testValue = new BigDecimal("123.45");
+ Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
+ int bucketNum = 10;
+
+ RowType rowType =
+ RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new
String[] {"amount"});
+
+ GenericRow row = GenericRow.of(decimal);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("amount"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = testValue.unscaledValue().toByteArray();
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket = transform.bind(Types.DecimalType.of(10,
2)).apply(testValue);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testTimestampEncodingHash() throws IOException {
+ // Iceberg expects microseconds for TIMESTAMP type
+ long millis = 1698235273182L;
+ int nanos = 123456;
+ long micros = millis * 1000 + (nanos / 1000);
+ TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos);
+ int bucketNum = 10;
+
+ RowType rowType =
+ RowType.of(new DataType[] {DataTypes.TIMESTAMP(6)}, new
String[] {"event_time"});
+
+ GenericRow row = GenericRow.of(testValue);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("event_time"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(micros);
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket =
transform.bind(Types.TimestampType.withoutZone()).apply(micros);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testDateHash() throws IOException {
+ int dateValue = 19655;
+ int bucketNum = 10;
+
+ RowType rowType = RowType.of(new DataType[] {DataTypes.DATE()}, new
String[] {"date"});
+ GenericRow row = GenericRow.of(dateValue);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("date"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(dateValue);
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket =
transform.bind(Types.DateType.get()).apply(dateValue);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testTimeHashing() throws IOException {
+ // Fluss stores time as int (milliseconds since midnight)
+ int timeMillis = 34200000;
+ long timeMicros = timeMillis * 1000L; // Convert to microseconds for
Iceberg
+ int bucketNum = 10;
+
+ RowType rowType = RowType.of(new DataType[] {DataTypes.TIME()}, new
String[] {"time"});
+
+ GenericRow row = GenericRow.of(timeMillis);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("time"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ byte[] equivalentBytes = toBytes(timeMicros);
+ assertThat(ourEncodedKey).isEqualTo(equivalentBytes);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ int icebergBucket =
transform.bind(Types.TimeType.get()).apply(timeMicros);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ @Test
+ void testBinaryEncoding() throws IOException {
+ byte[] testValue = "Hello i only understand binary data".getBytes();
+ int bucketNum = 10;
+
+ RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new
String[] {"data"});
+
+ GenericRow row = GenericRow.of(testValue);
+ IcebergKeyEncoder encoder =
+ new IcebergKeyEncoder(rowType,
Collections.singletonList("data"));
+
+ // Encode with our implementation
+ byte[] ourEncodedKey = encoder.encodeKey(row);
+ assertThat(ourEncodedKey).isEqualTo(testValue);
+
+ Transform<Object, Integer> transform = Transforms.bucket(bucketNum);
+ ByteBuffer byteBuffer = ByteBuffer.wrap(testValue);
+ int icebergBucket =
transform.bind(Types.BinaryType.get()).apply(byteBuffer);
+
+ IcebergBucketingFunction icebergBucketingFunction = new
IcebergBucketingFunction();
+ int ourBucket = icebergBucketingFunction.bucketing(ourEncodedKey,
bucketNum);
+
+ assertThat(ourBucket).isEqualTo(icebergBucket);
+ }
+
+ private byte[] toBytes(long value) {
+ return
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array();
+ }
+}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
new file mode 100644
index 000000000..8d2109301
--- /dev/null
+++
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergBinaryRowWriterTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.row.encode.iceberg;
+
+import com.alibaba.fluss.row.BinaryString;
+import com.alibaba.fluss.row.Decimal;
+
+import org.junit.jupiter.api.Test;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Unit tests for {@link IcebergBinaryRowWriter}. */
+class IcebergBinaryRowWriterTest {
+
+ @Test
+ void testWriteBoolean() {
+ boolean value = false;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeBoolean(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer = ByteBuffer.allocate(1);
+ buffer.put((byte) (value ? 1 : 0));
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteByte() {
+ byte value = 1;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeByte(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer = ByteBuffer.allocate(1);
+ buffer.put((byte) (value));
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteShort() {
+ short value = 20;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeShort(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer = ByteBuffer.allocate(2);
+ buffer.put((byte) value);
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteInt() {
+ int value = 1234;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeInt(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putInt(value);
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteIntAsLong() {
+ int value = 1234;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeIntAsLong(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer =
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putLong(value);
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteLong() {
+ long value = 123456899122345678L;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeLong(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer =
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putLong(value);
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteFloat() {
+ float value = 3.14f;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeFloat(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer =
ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putFloat(value);
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteDouble() {
+ double value = 3.1415926;
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeDouble(value);
+
+ byte[] writerBytes = writer.toBytes();
+
+ ByteBuffer buffer =
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN);
+ buffer.putDouble(value);
+ byte[] expectedBytes = buffer.array();
+ assertThat(writerBytes).isEqualTo(expectedBytes);
+ }
+
+ @Test
+ void testWriteString() {
+ String value = "Hello World!";
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeString(BinaryString.fromString(value));
+ byte[] writerBytes = writer.toBytes();
+ byte[] strBytes = value.getBytes(StandardCharsets.UTF_8);
+
+ int length = ByteBuffer.wrap(writerBytes, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ byte[] actualContent = Arrays.copyOfRange(writerBytes, 4, 4 + length);
+
+ assertThat(length).isEqualTo(strBytes.length);
+ assertThat(actualContent).isEqualTo(strBytes);
+
+ writer.reset();
+
+ writer.writeString(BinaryString.fromString(value), true);
+ byte[] writerBytesWithoutPrefix = writer.toBytes();
+ assertThat(writerBytesWithoutPrefix).isEqualTo(strBytes);
+ }
+
+ @Test
+ void testWriteBytes() {
+ byte[] value = new byte[] {12, 23, 34, 45};
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeBytes(value);
+ byte[] writerBytes = writer.toBytes();
+
+ int length = ByteBuffer.wrap(writerBytes, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ byte[] actualContent = Arrays.copyOfRange(writerBytes, 4, 4 + length);
+
+ assertThat(length).isEqualTo(value.length);
+ assertThat(actualContent).isEqualTo(value);
+
+ writer.reset();
+
+ writer.writeBytes(value, true);
+ byte[] writerBytesWithoutPrefix = writer.toBytes();
+ assertThat(writerBytesWithoutPrefix).isEqualTo(value);
+ }
+
+ @Test
+ void testWriteDecimal() {
+ BigDecimal bigDecimal = new BigDecimal("123.45");
+ Decimal value = Decimal.fromBigDecimal(bigDecimal, 10, 2);
+
+ IcebergBinaryRowWriter writer = new IcebergBinaryRowWriter(1);
+ writer.writeDecimal(value);
+ byte[] writerBytes = writer.toBytes();
+ byte[] expectedBytes = bigDecimal.unscaledValue().toByteArray();
+
+ int length = ByteBuffer.wrap(writerBytes, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
+ byte[] actualContent = Arrays.copyOfRange(writerBytes, 4, 4 + length);
+
+ assertThat(length).isEqualTo(expectedBytes.length);
+ assertThat(actualContent).isEqualTo(expectedBytes);
+
+ writer.reset();
+
+ writer.writeDecimal(value, true);
+ byte[] writerBytesWithoutPrefix = writer.toBytes();
+ assertThat(writerBytesWithoutPrefix).isEqualTo(expectedBytes);
+ }
+}
diff --git
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
index 2934f85e8..14f0cf1b6 100644
---
a/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
+++
b/fluss-common/src/test/java/com/alibaba/fluss/row/encode/iceberg/IcebergKeyEncoderTest.java
@@ -19,16 +19,13 @@ package com.alibaba.fluss.row.encode.iceberg;
import com.alibaba.fluss.row.BinaryString;
import com.alibaba.fluss.row.Decimal;
-import com.alibaba.fluss.row.TimestampLtz;
+import com.alibaba.fluss.row.GenericRow;
import com.alibaba.fluss.row.TimestampNtz;
-import com.alibaba.fluss.row.indexed.IndexedRow;
-import com.alibaba.fluss.row.indexed.IndexedRowWriter;
import com.alibaba.fluss.types.DataType;
import com.alibaba.fluss.types.DataTypes;
import com.alibaba.fluss.types.RowType;
import org.apache.iceberg.types.Conversions;
-import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.junit.jupiter.api.Test;
@@ -36,19 +33,14 @@ import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
+import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
-import static com.alibaba.fluss.types.DataTypeChecks.getPrecision;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
-/**
- * Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches
Iceberg's format.
- *
- * <p>This test uses Iceberg's actual Conversions class to ensure our encoding
is byte-for-byte
- * compatible with Iceberg's implementation.
- */
+/** Unit tests for {@link IcebergKeyEncoder} to verify the encoding matches
Iceberg's format. */
class IcebergKeyEncoderTest {
@Test
@@ -73,17 +65,15 @@ class IcebergKeyEncoderTest {
RowType rowType = RowType.of(new DataType[] {DataTypes.INT()}, new
String[] {"id"});
int testValue = 42;
- IndexedRow row = createRowWithInt(testValue);
+ GenericRow row = GenericRow.of(testValue);
IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
- // Encode with Iceberg's implementation
- ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.IntegerType.get(), testValue);
- byte[] icebergEncoded = toByteArray(icebergBuffer);
-
- assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(testValue);
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
}
@Test
@@ -91,12 +81,16 @@ class IcebergKeyEncoderTest {
RowType rowType = RowType.of(new DataType[] {DataTypes.BIGINT()}, new
String[] {"id"});
long testValue = 1234567890123456789L;
- IndexedRow row = createRowWithLong(testValue);
+ GenericRow row = GenericRow.of(testValue);
IcebergKeyEncoder encoder = new IcebergKeyEncoder(rowType,
Collections.singletonList("id"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(testValue);
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
+
// Encode with Iceberg's implementation
ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.LongType.get(), testValue);
byte[] icebergEncoded = toByteArray(icebergBuffer);
@@ -109,24 +103,16 @@ class IcebergKeyEncoderTest {
RowType rowType = RowType.of(new DataType[] {DataTypes.STRING()}, new
String[] {"name"});
String testValue = "Hello Iceberg, Fluss this side!";
- IndexedRow row = createRowWithString(testValue);
+ GenericRow row = GenericRow.of(BinaryString.fromString(testValue));
IcebergKeyEncoder encoder =
new IcebergKeyEncoder(rowType,
Collections.singletonList("name"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
- // Decode length prefix
- int length = ByteBuffer.wrap(ourEncoded, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
- byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
-
- // Encode with Iceberg's Conversions
- byte[] expectedContent =
- toByteArray(Conversions.toByteBuffer(Types.StringType.get(),
testValue));
-
- // Validate length and content
- assertThat(length).isEqualTo(expectedContent.length);
- assertThat(actualContent).isEqualTo(expectedContent);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = testValue.getBytes(StandardCharsets.UTF_8);
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
}
@Test
@@ -135,25 +121,17 @@ class IcebergKeyEncoderTest {
RowType.of(new DataType[] {DataTypes.DECIMAL(10, 2)}, new
String[] {"amount"});
BigDecimal testValue = new BigDecimal("123.45");
- IndexedRow row = createRowWithDecimal(testValue, 10, 2);
+ Decimal decimal = Decimal.fromBigDecimal(testValue, 10, 2);
+ GenericRow row = GenericRow.of(decimal);
IcebergKeyEncoder encoder =
new IcebergKeyEncoder(rowType,
Collections.singletonList("amount"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
- // Extract the decimal length prefix and bytes from ourEncoded
- int length = ByteBuffer.wrap(ourEncoded, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
- byte[] actualDecimal = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
-
- // Encode the same value with Iceberg's implementation (no prefix)
- Type.PrimitiveType decimalType = Types.DecimalType.of(10, 2);
- ByteBuffer icebergBuffer = Conversions.toByteBuffer(decimalType,
testValue);
- byte[] icebergEncoded = toByteArray(icebergBuffer);
-
- // Validate that our content (excluding the prefix) matches Iceberg's
encoding
- assertThat(length).isEqualTo(icebergEncoded.length);
- assertThat(actualDecimal).isEqualTo(icebergEncoded);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = testValue.unscaledValue().toByteArray();
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
}
@Test
@@ -166,19 +144,16 @@ class IcebergKeyEncoderTest {
int nanos = 123456;
long micros = millis * 1000 + (nanos / 1000);
- IndexedRow row = createRowWithTimestampNtz(millis, nanos);
+ TimestampNtz testValue = TimestampNtz.fromMillis(millis, nanos);
+ GenericRow row = GenericRow.of(testValue);
IcebergKeyEncoder encoder =
new IcebergKeyEncoder(rowType,
Collections.singletonList("event_time"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
-
- // Encode with Iceberg's implementation
- ByteBuffer icebergBuffer =
- Conversions.toByteBuffer(Types.TimestampType.withoutZone(),
micros);
- byte[] icebergEncoded = toByteArray(icebergBuffer);
-
- assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(micros);
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
}
@Test
@@ -187,18 +162,16 @@ class IcebergKeyEncoderTest {
// Date value as days since epoch
int dateValue = 19655; // 2023-10-25
- IndexedRow row = createRowWithDate(dateValue);
+ GenericRow row = GenericRow.of(dateValue);
IcebergKeyEncoder encoder =
new IcebergKeyEncoder(rowType,
Collections.singletonList("date"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
- // Encode with Iceberg's implementation
- ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.DateType.get(), dateValue);
- byte[] icebergEncoded = toByteArray(icebergBuffer);
-
- assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ // This is the equivalent bytes array which the key should be encoded
to.
+ byte[] equivalentBytes = toBytes(dateValue);
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
}
@Test
@@ -209,18 +182,15 @@ class IcebergKeyEncoderTest {
int timeMillis = 34200000;
long timeMicros = timeMillis * 1000L; // Convert to microseconds for
Iceberg
- IndexedRow row = createRowWithTime(timeMillis);
+ GenericRow row = GenericRow.of(timeMillis);
+
IcebergKeyEncoder encoder =
new IcebergKeyEncoder(rowType,
Collections.singletonList("time"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
-
- // Encode with Iceberg's implementation (expects microseconds as long)
- ByteBuffer icebergBuffer =
Conversions.toByteBuffer(Types.TimeType.get(), timeMicros);
- byte[] icebergEncoded = toByteArray(icebergBuffer);
-
- assertThat(ourEncoded).isEqualTo(icebergEncoded);
+ byte[] equivalentBytes = toBytes(timeMicros);
+ assertThat(ourEncoded).isEqualTo(equivalentBytes);
}
@Test
@@ -228,25 +198,13 @@ class IcebergKeyEncoderTest {
RowType rowType = RowType.of(new DataType[] {DataTypes.BYTES()}, new
String[] {"data"});
byte[] testValue = "Hello i only understand binary data".getBytes();
- IndexedRow row = createRowWithBytes(testValue);
+ GenericRow row = GenericRow.of(testValue);
IcebergKeyEncoder encoder =
new IcebergKeyEncoder(rowType,
Collections.singletonList("data"));
// Encode with our implementation
byte[] ourEncoded = encoder.encodeKey(row);
-
- // Decode length prefix
- int length = ByteBuffer.wrap(ourEncoded, 0,
4).order(ByteOrder.LITTLE_ENDIAN).getInt();
- byte[] actualContent = Arrays.copyOfRange(ourEncoded, 4, 4 + length);
-
- // Encode using Iceberg's Conversions (input should be ByteBuffer)
- ByteBuffer icebergBuffer =
- Conversions.toByteBuffer(Types.BinaryType.get(),
ByteBuffer.wrap(testValue));
- byte[] expectedContent = toByteArray(icebergBuffer);
-
- // Validate length and content
- assertThat(length).isEqualTo(expectedContent.length);
- assertThat(actualContent).isEqualTo(expectedContent);
+ assertThat(ourEncoded).isEqualTo(testValue);
}
// Helper method to convert ByteBuffer to byte array
@@ -256,138 +214,7 @@ class IcebergKeyEncoderTest {
return array;
}
- // ---- Helper methods to create IndexedRow instances ----
-
- private IndexedRow createRowWithInt(int value) throws IOException {
- DataType[] dataTypes = {DataTypes.INT()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeInt(value);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithLong(long value) throws IOException {
- DataType[] dataTypes = {DataTypes.BIGINT()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeLong(value);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithString(String value) throws IOException {
- DataType[] dataTypes = {DataTypes.STRING()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeString(BinaryString.fromString(value));
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithBoolean(boolean value) throws IOException {
- DataType[] dataTypes = {DataTypes.BOOLEAN()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeBoolean(value);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithFloat(float value) throws IOException {
- DataType[] dataTypes = {DataTypes.FLOAT()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeFloat(value);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithDouble(double value) throws IOException {
- DataType[] dataTypes = {DataTypes.DOUBLE()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeDouble(value);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithDecimal(BigDecimal value, int precision,
int scale)
- throws IOException {
- DataType[] dataTypes = {DataTypes.DECIMAL(precision, scale)};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeDecimal(Decimal.fromBigDecimal(value, precision,
scale), precision);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithTimestampNtz(long millis, int nanos)
throws IOException {
- DataType[] dataTypes = {DataTypes.TIMESTAMP(6)};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos),
6);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithTimestampLtz(long millis, int nanos)
throws IOException {
- DataType[] dataTypes = {DataTypes.TIMESTAMP_LTZ(6)};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeTimestampLtz(TimestampLtz.fromEpochMillis(millis,
nanos), 6);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithDate(int days) throws IOException {
- DataType[] dataTypes = {DataTypes.DATE()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeInt(days);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithTime(int millis) throws IOException {
- DataType[] dataTypes = {DataTypes.TIME()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeInt(millis); // Fluss stores TIME as int (milliseconds)
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- writer.close();
- return row;
- }
- }
-
- private IndexedRow createRowWithBytes(byte[] value) throws IOException {
- DataType[] dataTypes = {DataTypes.BYTES()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeBytes(value);
- IndexedRow row = new IndexedRow(dataTypes);
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
- }
-
- private IndexedRow createRowWithTimestampNtz(long millis, int nanos,
DataType type)
- throws IOException {
- DataType[] dataTypes = {DataTypes.BYTES()};
- try (IndexedRowWriter writer = new IndexedRowWriter(dataTypes)) {
- writer.writeTimestampNtz(TimestampNtz.fromMillis(millis, nanos),
getPrecision(type));
- IndexedRow row = new IndexedRow(new DataType[] {type});
- row.pointTo(writer.segment(), 0, writer.position());
- return row;
- }
+ private byte[] toBytes(long value) {
+ return
ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN).putLong(value).array();
}
}