This is an automated email from the ASF dual-hosted git repository. akudinkin pushed a commit to branch HUDI-4971-cancel-relocation in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 82d78409a375e80de73002744be85229e1ecfc8a Author: Alexey Kudinkin <ale...@infinilake.com> AuthorDate: Thu Oct 20 12:59:38 2022 -0700 `BinaryUtil` > `BinaryUtils`; Added utility to extract bytes from `ByteBuffer` --- .../apache/hudi/sort/SpaceCurveSortingHelper.java | 34 +++++++++++----------- .../spark/sql/hudi/execution/RangeSample.scala | 10 +++---- .../hudi/common/table/HoodieTableConfig.java | 4 +-- .../util/{BinaryUtil.java => BinaryUtils.java} | 12 +++++++- .../apache/hudi/common/util/SpillableMapUtils.java | 2 +- .../common/util/collection/BitCaskDiskMap.java | 2 +- .../{TestBinaryUtil.java => TestBinaryUtils.java} | 22 +++++++------- 7 files changed, 48 insertions(+), 38 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java index 496168e844..1ff54773c4 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/sort/SpaceCurveSortingHelper.java @@ -18,7 +18,7 @@ package org.apache.hudi.sort; -import org.apache.hudi.common.util.BinaryUtil; +import org.apache.hudi.common.util.BinaryUtils; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieClusteringConfig; import org.apache.hudi.optimize.HilbertCurveUtils; @@ -158,7 +158,7 @@ public class SpaceCurveSortingHelper { .toArray(byte[][]::new); // Interleave received bytes to produce Z-curve ordinal - byte[] zOrdinalBytes = BinaryUtil.interleaving(zBytes, 8); + byte[] zOrdinalBytes = BinaryUtils.interleaving(zBytes, 8); return appendToRow(row, zOrdinalBytes); }) .sortBy(f -> new ByteArraySorting((byte[]) f.get(fieldNum)), true, fileNum); @@ -206,30 +206,30 @@ public class SpaceCurveSortingHelper { @Nonnull private static byte[] mapColumnValueTo8Bytes(Row row, int index, DataType dataType) { if (dataType instanceof LongType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getLong(index)); } else if (dataType instanceof DoubleType) { - return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); + return BinaryUtils.doubleTo8Byte(row.isNullAt(index) ? Double.MAX_VALUE : row.getDouble(index)); } else if (dataType instanceof IntegerType) { - return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); + return BinaryUtils.intTo8Byte(row.isNullAt(index) ? Integer.MAX_VALUE : row.getInt(index)); } else if (dataType instanceof FloatType) { - return BinaryUtil.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); + return BinaryUtils.doubleTo8Byte(row.isNullAt(index) ? Float.MAX_VALUE : row.getFloat(index)); } else if (dataType instanceof StringType) { - return BinaryUtil.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); + return BinaryUtils.utf8To8Byte(row.isNullAt(index) ? "" : row.getString(index)); } else if (dataType instanceof DateType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime()); } else if (dataType instanceof TimestampType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime()); } else if (dataType instanceof ByteType) { - return BinaryUtil.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); + return BinaryUtils.byteTo8Byte(row.isNullAt(index) ? Byte.MAX_VALUE : row.getByte(index)); } else if (dataType instanceof ShortType) { - return BinaryUtil.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); + return BinaryUtils.intTo8Byte(row.isNullAt(index) ? Short.MAX_VALUE : row.getShort(index)); } else if (dataType instanceof DecimalType) { - return BinaryUtil.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); + return BinaryUtils.longTo8Byte(row.isNullAt(index) ? Long.MAX_VALUE : row.getDecimal(index).longValue()); } else if (dataType instanceof BooleanType) { boolean value = row.isNullAt(index) ? false : row.getBoolean(index); - return BinaryUtil.intTo8Byte(value ? 1 : 0); + return BinaryUtils.intTo8Byte(value ? 1 : 0); } else if (dataType instanceof BinaryType) { - return BinaryUtil.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); + return BinaryUtils.paddingTo8Byte(row.isNullAt(index) ? new byte[] {0} : (byte[]) row.get(index)); } throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); @@ -245,13 +245,13 @@ public class SpaceCurveSortingHelper { } else if (dataType instanceof FloatType) { return row.isNullAt(index) ? Long.MAX_VALUE : Double.doubleToLongBits((double) row.getFloat(index)); } else if (dataType instanceof StringType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertStringToLong(row.getString(index)); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertStringToLong(row.getString(index)); } else if (dataType instanceof DateType) { return row.isNullAt(index) ? Long.MAX_VALUE : row.getDate(index).getTime(); } else if (dataType instanceof TimestampType) { return row.isNullAt(index) ? Long.MAX_VALUE : row.getTimestamp(index).getTime(); } else if (dataType instanceof ByteType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong(new byte[] {row.getByte(index)}); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertBytesToLong(new byte[] {row.getByte(index)}); } else if (dataType instanceof ShortType) { return row.isNullAt(index) ? Long.MAX_VALUE : (long) row.getShort(index); } else if (dataType instanceof DecimalType) { @@ -260,7 +260,7 @@ public class SpaceCurveSortingHelper { boolean value = row.isNullAt(index) ? false : row.getBoolean(index); return value ? Long.MAX_VALUE : 0; } else if (dataType instanceof BinaryType) { - return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtil.convertBytesToLong((byte[]) row.get(index)); + return row.isNullAt(index) ? Long.MAX_VALUE : BinaryUtils.convertBytesToLong((byte[]) row.get(index)); } throw new UnsupportedOperationException(String.format("Unsupported data-type (%s)", dataType.typeName())); diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala index 7c39ce2546..757964c218 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/hudi/execution/RangeSample.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.hudi.execution -import org.apache.hudi.common.util.BinaryUtil +import org.apache.hudi.common.util.BinaryUtils import org.apache.hudi.config.HoodieClusteringConfig import org.apache.hudi.config.HoodieClusteringConfig.LayoutOptimizationStrategy import org.apache.hudi.optimize.HilbertCurveUtils @@ -240,7 +240,7 @@ class RawDecisionBound[K : Ordering : ClassTag](ordering: Ordering[K]) extends S case class ByteArraySorting(b: Array[Byte]) extends Ordered[ByteArraySorting] with Serializable { override def compare(that: ByteArraySorting): Int = { val len = this.b.length - BinaryUtil.compareTo(this.b, 0, len, that.b, 0, len) + BinaryUtils.compareTo(this.b, 0, len, that.b, 0, len) } } @@ -430,7 +430,7 @@ object RangeSampleSort { case LayoutOptimizationStrategy.HILBERT => HilbertCurveUtils.indexBytes(hilbertCurve.get, values.map(_.toLong).toArray, 32) case LayoutOptimizationStrategy.ZORDER => - BinaryUtil.interleaving(values.map(BinaryUtil.intTo8Byte(_)).toArray, 8) + BinaryUtils.interleaving(values.map(BinaryUtils.intTo8Byte(_)).toArray, 8) } Row.fromSeq(row.toSeq ++ Seq(mapValues)) @@ -525,8 +525,8 @@ object RangeSampleSort { decisionBound.getBound(row, bound.asInstanceOf[Array[InternalRow]]) } } - }.toArray.map(BinaryUtil.intTo8Byte(_)) - val zValues = BinaryUtil.interleaving(interleaveValues, 8) + }.toArray.map(BinaryUtils.intTo8Byte(_)) + val zValues = BinaryUtils.interleaving(interleaveValues, 8) val mutablePair = new MutablePair[InternalRow, Array[Byte]]() mutablePair.update(unsafeRow, zValues) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index ac3608fc00..3239cc213c 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -34,7 +34,7 @@ import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode; import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; -import org.apache.hudi.common.util.BinaryUtil; +import org.apache.hudi.common.util.BinaryUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; @@ -460,7 +460,7 @@ public class HoodieTableConfig extends HoodieConfig { } String table = props.getProperty(NAME.key()); String database = props.getProperty(DATABASE_NAME.key(), ""); - return BinaryUtil.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); + return BinaryUtils.generateChecksum(String.format(TABLE_CHECKSUM_FORMAT, database, table).getBytes(UTF_8)); } public static boolean validateChecksum(Properties props) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java similarity index 95% rename from hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java rename to hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java index 9fec2c8cf5..96410a619e 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BinaryUtils.java @@ -18,10 +18,11 @@ package org.apache.hudi.common.util; +import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.zip.CRC32; -public class BinaryUtil { +public class BinaryUtils { /** * Lexicographically compare two arrays. @@ -117,6 +118,15 @@ public class BinaryUtil { return (byte) (a ^ (1 << (7 - apos))); } + /** + * Copies {@link ByteBuffer} into allocated {@code byte[]} array + */ + public static byte[] toBytes(ByteBuffer buffer) { + byte[] bytes = new byte[buffer.remaining()]; + buffer.get(bytes); + return bytes; + } + public static byte[] toBytes(int val) { byte[] b = new byte[4]; for (int i = 3; i > 0; i--) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java index d4bafd9c9f..41fd3b6951 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SpillableMapUtils.java @@ -35,7 +35,7 @@ import java.io.IOException; import java.io.RandomAccessFile; import static org.apache.hudi.avro.HoodieAvroUtils.getNullableValAsString; -import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; +import static org.apache.hudi.common.util.BinaryUtils.generateChecksum; /** * A utility class supports spillable map. diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java index 9fb0b20e74..d5a4559848 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/collection/BitCaskDiskMap.java @@ -56,7 +56,7 @@ import java.util.zip.Deflater; import java.util.zip.DeflaterOutputStream; import java.util.zip.InflaterInputStream; -import static org.apache.hudi.common.util.BinaryUtil.generateChecksum; +import static org.apache.hudi.common.util.BinaryUtils.generateChecksum; /** * This class provides a disk spillable only map implementation. All of the data is currenly written to one file, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java similarity index 87% rename from hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java rename to hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java index 1efe5a0686..fa0140cbc3 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtil.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestBinaryUtils.java @@ -27,7 +27,7 @@ import java.util.List; import static org.junit.jupiter.api.Assertions.assertEquals; -public class TestBinaryUtil { +public class TestBinaryUtils { @Test public void testIntConvert() { @@ -37,12 +37,12 @@ public class TestBinaryUtil { List<ConvertResultWrapper<Integer>> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testInt.length; i++) { valueWrappers.add(new OrginValueWrapper<>(i, testInt[i])); - convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtil.intTo8Byte(testInt[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>(i, BinaryUtils.intTo8Byte(testInt[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testInt.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -57,12 +57,12 @@ public class TestBinaryUtil { List<ConvertResultWrapper<Long>> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testLong.length; i++) { valueWrappers.add(new OrginValueWrapper<>((long)i, testLong[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtil.longTo8Byte(testLong[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((long)i, BinaryUtils.longTo8Byte(testLong[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testLong.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -77,12 +77,12 @@ public class TestBinaryUtil { List<ConvertResultWrapper<Double>> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((Double)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtil.doubleTo8Byte(testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((Double)(i * 1.0), BinaryUtils.doubleTo8Byte(testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -97,12 +97,12 @@ public class TestBinaryUtil { List<ConvertResultWrapper<Float>> convertResultWrappers = new ArrayList<>(); for (int i = 0; i < testDouble.length; i++) { valueWrappers.add(new OrginValueWrapper<>((float)(i * 1.0), testDouble[i])); - convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtil.doubleTo8Byte((double) testDouble[i]))); + convertResultWrappers.add(new ConvertResultWrapper<>((float)(i * 1.0), BinaryUtils.doubleTo8Byte((double) testDouble[i]))); } Collections.sort(valueWrappers, ((o1, o2) -> o1.originValue.compareTo(o2.originValue))); - Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtil.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); + Collections.sort(convertResultWrappers, ((o1, o2) -> BinaryUtils.compareTo(o1.result, 0, o1.result.length, o2.result, 0, o2.result.length))); for (int i = 0; i < testDouble.length; i++) { assertEquals(valueWrappers.get(i).index, convertResultWrappers.get(i).index); @@ -131,7 +131,7 @@ public class TestBinaryUtil { public void testConvertBytesToLong() { long[] tests = new long[] {Long.MIN_VALUE, -1L, 0, 1L, Long.MAX_VALUE}; for (int i = 0; i < tests.length; i++) { - assertEquals(BinaryUtil.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); + assertEquals(BinaryUtils.convertBytesToLong(convertLongToBytes(tests[i])), tests[i]); } } @@ -140,7 +140,7 @@ public class TestBinaryUtil { byte[] bytes = new byte[2]; bytes[0] = 2; bytes[1] = 127; - assertEquals(BinaryUtil.convertBytesToLong(bytes), 2 * 256 + 127); + assertEquals(BinaryUtils.convertBytesToLong(bytes), 2 * 256 + 127); } private byte[] convertLongToBytes(long num) {