This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new cae7cd9e [#804] improvement: Optimize CRC calculation of ByteBuffer
(#805)
cae7cd9e is described below
commit cae7cd9e379aedca95e2590eeedfcc27fd9f56b9
Author: roryqi <[email protected]>
AuthorDate: Mon Apr 10 12:38:35 2023 +0800
[#804] improvement: Optimize CRC calculation of ByteBuffer (#805)
### What changes were proposed in this pull request?
1. remove unnecessary methods
2. Optimize crc method
### Why are the changes needed?
Fix: #804
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
New ut
---
.../apache/uniffle/common/util/ChecksumUtils.java | 20 ++--
.../uniffle/common/util/ChecksumUtilsTest.java | 26 ++++-
.../storage/handler/impl/HdfsFileWriter.java | 27 -----
.../storage/handler/impl/ShuffleIndexHeader.java | 130 ---------------------
.../uniffle/storage/util/ShuffleStorageUtils.java | 13 ---
5 files changed, 38 insertions(+), 178 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
b/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
index f74510c6..32ecf867 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
@@ -40,14 +40,20 @@ public class ChecksumUtils {
return crc32.getValue();
}
- // you may need to flip at first
public static long getCrc32(ByteBuffer byteBuffer) {
- if (byteBuffer.hasArray()) {
- return getCrc32(byteBuffer.array(), byteBuffer.arrayOffset() +
byteBuffer.position(), byteBuffer.remaining());
- } else {
- byte[] byteArray = new byte[byteBuffer.remaining()];
- byteBuffer.get(byteArray);
- return getCrc32(byteArray);
+ return getCrc32(byteBuffer, byteBuffer.position(), byteBuffer.limit() -
byteBuffer.position());
+ }
+
+ public static long getCrc32(ByteBuffer byteBuffer, int offset, int length) {
+ CRC32 crc32 = new CRC32();
+ ByteBuffer crcBuffer = byteBuffer.duplicate();
+ crcBuffer.position(offset);
+ for (int i = 0; i < length; ) {
+ int len = Math.min(LENGTH_PER_CRC, length - i);
+ crcBuffer.limit(crcBuffer.position() + len);
+ crc32.update(crcBuffer);
+ i += len;
}
+ return crc32.getValue();
}
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
b/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
index 7e24ddfe..fc822f42 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
@@ -76,7 +76,6 @@ public class ChecksumUtilsTest {
buffer.flip();
long expectedChecksum = ChecksumUtils.getCrc32(data);
assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer));
- assertEquals(length, buffer.position());
// test heap ByteBuffer
path = Paths.get(file.getAbsolutePath());
@@ -89,4 +88,29 @@ public class ChecksumUtilsTest {
assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer));
}
+
+ @Test
+ public void crc32ByteBufferTest() throws Exception {
+ int length = 32 * 1024 * 1024;
+ byte[] data = new byte[length];
+ Random random = new Random();
+ random.nextBytes(data);
+ long expectCrc = ChecksumUtils.getCrc32(data);
+ ByteBuffer originBuffer = ByteBuffer.allocateDirect(length);
+ originBuffer.put(data);
+ originBuffer.flip();
+ assertEquals(expectCrc, ChecksumUtils.getCrc32(ByteBuffer.wrap(data)));
+ ByteBuffer directBuffer = ByteBuffer.allocateDirect(length);
+ directBuffer.put(data);
+ directBuffer.flip();
+ assertEquals(expectCrc, ChecksumUtils.getCrc32(directBuffer));
+ assertEquals(originBuffer, directBuffer);
+ int offset = random.nextInt(15);
+ ByteBuffer directOffsetBuffer = ByteBuffer.allocateDirect(length + offset);
+ byte[] dataOffset = new byte[offset];
+ random.nextBytes(dataOffset);
+ directOffsetBuffer.put(dataOffset);
+ directOffsetBuffer.put(data);
+ assertEquals(expectCrc, ChecksumUtils.getCrc32(directOffsetBuffer, offset,
length));
+ }
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 6750d6ba..5e179a6b 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -31,10 +30,8 @@ import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.api.FileWriter;
import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
public class HdfsFileWriter implements FileWriter, Closeable {
@@ -103,30 +100,6 @@ public class HdfsFileWriter implements FileWriter,
Closeable {
fsDataOutputStream.writeLong(segment.getTaskAttemptId());
}
- // index file header is PartitionNum | [(PartitionId | PartitionFileLength |
PartitionDataFileLength), ] | CRC
- public void writeHeader(List<Integer> partitionList,
- List<Long> indexFileSizeList,
- List<Long> dataFileSizeList) throws IOException {
- ByteBuffer headerContentBuf = ByteBuffer.allocate(
- (int)ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size()) -
ShuffleStorageUtils.getHeaderCrcLen());
- fsDataOutputStream.writeInt(partitionList.size());
- headerContentBuf.putInt(partitionList.size());
- for (int i = 0; i < partitionList.size(); i++) {
- fsDataOutputStream.writeInt(partitionList.get(i));
- fsDataOutputStream.writeLong(indexFileSizeList.get(i));
- fsDataOutputStream.writeLong(dataFileSizeList.get(i));
- headerContentBuf.putInt(partitionList.get(i));
- headerContentBuf.putLong(indexFileSizeList.get(i));
- headerContentBuf.putLong(dataFileSizeList.get(i));
- }
- headerContentBuf.flip();
- fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
- long len = ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size());
- if (fsDataOutputStream.getPos() != len) {
- throw new IOException("Fail to write index header");
- }
- }
-
public long nextOffset() {
return nextOffset;
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java
deleted file mode 100644
index 07edff17..00000000
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.storage.handler.impl;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
-
-public class ShuffleIndexHeader {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ShuffleIndexHeader.class);
-
- private int partitionNum;
- private List<Entry> indexes = Lists.newArrayList();
- private long crc;
-
- public ShuffleIndexHeader(int partitionNum, List<Entry> indexes, long crc) {
- this.partitionNum = partitionNum;
- this.indexes = indexes;
- this.crc = crc;
- }
-
- public void setPartitionNum(int partitionNum) {
- this.partitionNum = partitionNum;
- }
-
- public int getPartitionNum() {
- return partitionNum;
- }
-
- public List<Entry> getIndexes() {
- return indexes;
- }
-
- public long getCrc() {
- return crc;
- }
-
- public void setCrc(long crc) {
- this.crc = crc;
- }
-
- public int getHeaderLen() {
- return (int) ShuffleStorageUtils.getIndexFileHeaderLen(partitionNum);
- }
-
- // No side effects on byteBuffer
- public static ShuffleIndexHeader extractHeader(ByteBuffer byteBuffer) {
- try {
- int partitionNum = byteBuffer.getInt();
- ByteBuffer headerContentBuf = ByteBuffer.allocate(
- (int) ShuffleStorageUtils.getIndexFileHeaderLen(partitionNum)
- - ShuffleStorageUtils.getHeaderCrcLen());
- headerContentBuf.putInt(partitionNum);
- List<Entry> entries = Lists.newArrayList();
-
- for (int i = 0; i < partitionNum; i++) {
- int partitionId = byteBuffer.getInt();
- long partitionLength = byteBuffer.getLong();
- long partitionDataFileLength = byteBuffer.getLong();
- headerContentBuf.putInt(partitionId);
- headerContentBuf.putLong(partitionLength);
- headerContentBuf.putLong(partitionDataFileLength);
-
- ShuffleIndexHeader.Entry entry
- = new ShuffleIndexHeader.Entry(partitionId, partitionLength,
partitionDataFileLength);
- entries.add(entry);
- }
-
- headerContentBuf.flip();
- long crc = byteBuffer.getLong();
- long actualCrc = ChecksumUtils.getCrc32(headerContentBuf);
- if (crc != actualCrc) {
- LOG.error("Read header exception, expected crc[{}] != actual crc[{}]",
crc, actualCrc);
- return null;
- }
- // clear the side effect on byteBuffer
- byteBuffer.clear();
- return new ShuffleIndexHeader(partitionNum, entries, crc);
- } catch (Exception e) {
- LOG.error("Fail to extract header from {}, with exception",
byteBuffer.toString(), e);
- return null;
- }
- }
-
- static class Entry {
- int partitionId;
- long partitionIndexLength;
- long partitionDataLength;
-
- Entry(int partitionId, long partitionIndexLength, long
partitionDataLength) {
- this.partitionId = partitionId;
- this.partitionIndexLength = partitionIndexLength;
- this.partitionDataLength = partitionDataLength;
- }
-
- public int getPartitionId() {
- return partitionId;
- }
-
- public long getPartitionIndexLength() {
- return partitionIndexLength;
- }
-
- public long getPartitionDataLength() {
- return partitionDataLength;
- }
- }
-}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index 4b19408f..e9abec15 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -56,15 +56,6 @@ public class ShuffleStorageUtils {
return fileNamePrefix + Constants.SHUFFLE_INDEX_FILE_SUFFIX;
}
- public static String generateAbsoluteFilePrefix(String base, String key, int
partition, String id) {
- return String.join(
- HDFS_PATH_SEPARATOR,
- base,
- key,
- String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(partition),
String.valueOf(partition)),
- id);
- }
-
public static List<DataFileSegment> mergeSegments(
String path, List<FileBasedShuffleSegment> segments, int readBufferSize)
{
List<DataFileSegment> dataFileSegments = Lists.newArrayList();
@@ -207,10 +198,6 @@ public class ShuffleStorageUtils {
return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
}
- public static int getHeaderCrcLen() {
- return 8;
- }
-
public static long uploadFile(File file, HdfsFileWriter writer, int
bufferSize) throws IOException {
try (FileInputStream inputStream = new FileInputStream(file)) {
return writer.copy(inputStream, bufferSize);