This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new cae7cd9e [#804] improvement: Optimize CRC calculation of ByteBuffer 
(#805)
cae7cd9e is described below

commit cae7cd9e379aedca95e2590eeedfcc27fd9f56b9
Author: roryqi <[email protected]>
AuthorDate: Mon Apr 10 12:38:35 2023 +0800

    [#804] improvement: Optimize CRC calculation of ByteBuffer (#805)
    
    ### What changes were proposed in this pull request?
    1. remove unnecessary methods
    2. Optimize crc method
    
    ### Why are the changes needed?
    
    Fix: #804
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New ut
---
 .../apache/uniffle/common/util/ChecksumUtils.java  |  20 ++--
 .../uniffle/common/util/ChecksumUtilsTest.java     |  26 ++++-
 .../storage/handler/impl/HdfsFileWriter.java       |  27 -----
 .../storage/handler/impl/ShuffleIndexHeader.java   | 130 ---------------------
 .../uniffle/storage/util/ShuffleStorageUtils.java  |  13 ---
 5 files changed, 38 insertions(+), 178 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java 
b/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
index f74510c6..32ecf867 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/ChecksumUtils.java
@@ -40,14 +40,20 @@ public class ChecksumUtils {
     return crc32.getValue();
   }
 
-  // you may need to flip at first
   public static long getCrc32(ByteBuffer byteBuffer) {
-    if (byteBuffer.hasArray()) {
-      return getCrc32(byteBuffer.array(), byteBuffer.arrayOffset() + 
byteBuffer.position(), byteBuffer.remaining());
-    } else {
-      byte[] byteArray = new byte[byteBuffer.remaining()];
-      byteBuffer.get(byteArray);
-      return getCrc32(byteArray);
+    return getCrc32(byteBuffer, byteBuffer.position(), byteBuffer.limit() - 
byteBuffer.position());
+  }
+
+  public static long getCrc32(ByteBuffer byteBuffer, int offset, int length) {
+    CRC32 crc32 = new CRC32();
+    ByteBuffer crcBuffer = byteBuffer.duplicate();
+    crcBuffer.position(offset);
+    for (int i = 0; i < length; ) {
+      int len = Math.min(LENGTH_PER_CRC, length - i);
+      crcBuffer.limit(crcBuffer.position() + len);
+      crc32.update(crcBuffer);
+      i += len;
     }
+    return crc32.getValue();
   }
 }
diff --git 
a/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java 
b/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
index 7e24ddfe..fc822f42 100644
--- a/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/util/ChecksumUtilsTest.java
@@ -76,7 +76,6 @@ public class ChecksumUtilsTest {
     buffer.flip();
     long expectedChecksum = ChecksumUtils.getCrc32(data);
     assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer));
-    assertEquals(length, buffer.position());
 
     // test heap ByteBuffer
     path = Paths.get(file.getAbsolutePath());
@@ -89,4 +88,29 @@ public class ChecksumUtilsTest {
     assertEquals(expectedChecksum, ChecksumUtils.getCrc32(buffer));
 
   }
+
+  @Test
+  public void crc32ByteBufferTest() throws Exception {
+    int length = 32 * 1024 * 1024;
+    byte[] data = new byte[length];
+    Random random = new Random();
+    random.nextBytes(data);
+    long expectCrc = ChecksumUtils.getCrc32(data);
+    ByteBuffer originBuffer = ByteBuffer.allocateDirect(length);
+    originBuffer.put(data);
+    originBuffer.flip();
+    assertEquals(expectCrc, ChecksumUtils.getCrc32(ByteBuffer.wrap(data)));
+    ByteBuffer directBuffer = ByteBuffer.allocateDirect(length);
+    directBuffer.put(data);
+    directBuffer.flip();
+    assertEquals(expectCrc, ChecksumUtils.getCrc32(directBuffer));
+    assertEquals(originBuffer, directBuffer);
+    int offset = random.nextInt(15);
+    ByteBuffer directOffsetBuffer = ByteBuffer.allocateDirect(length + offset);
+    byte[] dataOffset = new byte[offset];
+    random.nextBytes(dataOffset);
+    directOffsetBuffer.put(dataOffset);
+    directOffsetBuffer.put(data);
+    assertEquals(expectCrc, ChecksumUtils.getCrc32(directOffsetBuffer, offset, 
length));
+  }
 }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
index 6750d6ba..5e179a6b 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HdfsFileWriter.java
@@ -21,7 +21,6 @@ import java.io.Closeable;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
@@ -31,10 +30,8 @@ import org.apache.hadoop.io.IOUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.uniffle.common.util.ChecksumUtils;
 import org.apache.uniffle.storage.api.FileWriter;
 import org.apache.uniffle.storage.common.FileBasedShuffleSegment;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 
 public class HdfsFileWriter implements FileWriter, Closeable {
 
@@ -103,30 +100,6 @@ public class HdfsFileWriter implements FileWriter, 
Closeable {
     fsDataOutputStream.writeLong(segment.getTaskAttemptId());
   }
 
-  // index file header is PartitionNum | [(PartitionId | PartitionFileLength | 
PartitionDataFileLength), ] | CRC
-  public void writeHeader(List<Integer> partitionList,
-      List<Long> indexFileSizeList,
-      List<Long> dataFileSizeList) throws IOException {
-    ByteBuffer headerContentBuf = ByteBuffer.allocate(
-        (int)ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size()) - 
ShuffleStorageUtils.getHeaderCrcLen());
-    fsDataOutputStream.writeInt(partitionList.size());
-    headerContentBuf.putInt(partitionList.size());
-    for (int i = 0; i < partitionList.size(); i++) {
-      fsDataOutputStream.writeInt(partitionList.get(i));
-      fsDataOutputStream.writeLong(indexFileSizeList.get(i));
-      fsDataOutputStream.writeLong(dataFileSizeList.get(i));
-      headerContentBuf.putInt(partitionList.get(i));
-      headerContentBuf.putLong(indexFileSizeList.get(i));
-      headerContentBuf.putLong(dataFileSizeList.get(i));
-    }
-    headerContentBuf.flip();
-    fsDataOutputStream.writeLong(ChecksumUtils.getCrc32(headerContentBuf));
-    long len = ShuffleStorageUtils.getIndexFileHeaderLen(partitionList.size());
-    if (fsDataOutputStream.getPos() != len) {
-      throw new IOException("Fail to write index header");
-    }
-  }
-
   public long nextOffset() {
     return nextOffset;
   }
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java
 
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java
deleted file mode 100644
index 07edff17..00000000
--- 
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/ShuffleIndexHeader.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.storage.handler.impl;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-
-import com.google.common.collect.Lists;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.common.util.ChecksumUtils;
-import org.apache.uniffle.storage.util.ShuffleStorageUtils;
-
-public class ShuffleIndexHeader {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleIndexHeader.class);
-
-  private int partitionNum;
-  private List<Entry> indexes = Lists.newArrayList();
-  private long crc;
-
-  public ShuffleIndexHeader(int partitionNum, List<Entry> indexes, long crc) {
-    this.partitionNum = partitionNum;
-    this.indexes = indexes;
-    this.crc = crc;
-  }
-
-  public void setPartitionNum(int partitionNum) {
-    this.partitionNum = partitionNum;
-  }
-
-  public int getPartitionNum() {
-    return partitionNum;
-  }
-
-  public List<Entry> getIndexes() {
-    return indexes;
-  }
-
-  public long getCrc() {
-    return crc;
-  }
-
-  public void setCrc(long crc) {
-    this.crc = crc;
-  }
-
-  public int getHeaderLen() {
-    return (int) ShuffleStorageUtils.getIndexFileHeaderLen(partitionNum);
-  }
-
-  // No side effects on byteBuffer
-  public static ShuffleIndexHeader extractHeader(ByteBuffer byteBuffer) {
-    try {
-      int partitionNum = byteBuffer.getInt();
-      ByteBuffer headerContentBuf = ByteBuffer.allocate(
-          (int) ShuffleStorageUtils.getIndexFileHeaderLen(partitionNum)
-              - ShuffleStorageUtils.getHeaderCrcLen());
-      headerContentBuf.putInt(partitionNum);
-      List<Entry> entries = Lists.newArrayList();
-
-      for (int i = 0; i < partitionNum; i++) {
-        int partitionId = byteBuffer.getInt();
-        long partitionLength = byteBuffer.getLong();
-        long partitionDataFileLength = byteBuffer.getLong();
-        headerContentBuf.putInt(partitionId);
-        headerContentBuf.putLong(partitionLength);
-        headerContentBuf.putLong(partitionDataFileLength);
-
-        ShuffleIndexHeader.Entry entry
-            = new ShuffleIndexHeader.Entry(partitionId, partitionLength, 
partitionDataFileLength);
-        entries.add(entry);
-      }
-
-      headerContentBuf.flip();
-      long crc = byteBuffer.getLong();
-      long actualCrc = ChecksumUtils.getCrc32(headerContentBuf);
-      if (crc != actualCrc) {
-        LOG.error("Read header exception, expected crc[{}] != actual crc[{}]", 
crc, actualCrc);
-        return null;
-      }
-      // clear the side effect on byteBuffer
-      byteBuffer.clear();
-      return new ShuffleIndexHeader(partitionNum, entries, crc);
-    } catch (Exception e) {
-      LOG.error("Fail to extract header from {}, with exception", 
byteBuffer.toString(), e);
-      return null;
-    }
-  }
-
-  static class Entry {
-    int partitionId;
-    long partitionIndexLength;
-    long partitionDataLength;
-
-    Entry(int partitionId, long partitionIndexLength, long 
partitionDataLength) {
-      this.partitionId = partitionId;
-      this.partitionIndexLength = partitionIndexLength;
-      this.partitionDataLength = partitionDataLength;
-    }
-
-    public int getPartitionId() {
-      return partitionId;
-    }
-
-    public long getPartitionIndexLength() {
-      return partitionIndexLength;
-    }
-
-    public long getPartitionDataLength() {
-      return partitionDataLength;
-    }
-  }
-}
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
 
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
index 4b19408f..e9abec15 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/util/ShuffleStorageUtils.java
@@ -56,15 +56,6 @@ public class ShuffleStorageUtils {
     return fileNamePrefix + Constants.SHUFFLE_INDEX_FILE_SUFFIX;
   }
 
-  public static String generateAbsoluteFilePrefix(String base, String key, int 
partition, String id) {
-    return String.join(
-        HDFS_PATH_SEPARATOR,
-        base,
-        key,
-        String.join(HDFS_DIRNAME_SEPARATOR, String.valueOf(partition), 
String.valueOf(partition)),
-        id);
-  }
-
   public static List<DataFileSegment> mergeSegments(
       String path, List<FileBasedShuffleSegment> segments, int readBufferSize) 
{
     List<DataFileSegment> dataFileSegments = Lists.newArrayList();
@@ -207,10 +198,6 @@ public class ShuffleStorageUtils {
     return 4 + (4 + 8 + 8) * (long) partitionNum + 8;
   }
 
-  public static int getHeaderCrcLen() {
-    return 8;
-  }
-
   public static long uploadFile(File file, HdfsFileWriter writer, int 
bufferSize) throws IOException {
     try (FileInputStream inputStream = new FileInputStream(file)) {
       return writer.copy(inputStream, bufferSize);

Reply via email to