sijie closed pull request #1854: Store data block header length in index and 
block header
URL: https://github.com/apache/incubator-pulsar/pull/1854
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
index cae653be2e..f49b6a2d4f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/DataBlockHeader.java
@@ -36,7 +36,7 @@
     /**
      * Get the length of the block in bytes, including the header.
      */
-    int getBlockLength();
+    long getBlockLength();
 
     /**
      * Get the message entry Id for the first message that stored in this data 
block.
@@ -46,7 +46,7 @@
     /**
      * Get the size of this DataBlockHeader.
      */
-    int getHeaderSize();
+    long getHeaderLength();
 
     /**
      * Get the content of the data block header as InputStream.
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
index c7e71c721c..2bb59b4518 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlock.java
@@ -60,11 +60,16 @@
      */
     LedgerMetadata getLedgerMetadata();
 
-    /*
+    /**
      * Get the total size of the data object.
      */
     long getDataObjectLength();
 
+    /**
+     * Get the length of the header in the blocks in the data object.
+     */
+    long getDataBlockHeaderLength();
+
     /**
      * An input stream which knows the size of the stream upfront.
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
index c60ce88967..126b4b9057 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/OffloadIndexBlockBuilder.java
@@ -57,6 +57,12 @@
      */
     OffloadIndexBlockBuilder withDataObjectLength(long dataObjectLength);
 
+    /**
+     * Specify the length of the block headers in the data object.
+     * @param dataHeaderLength the length of the headers
+     */
+    OffloadIndexBlockBuilder withDataBlockHeaderLength(long dataHeaderLength);
+
     /**
      * Finalize the immutable OffloadIndexBlock
      */
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
index 5e46350041..65ffd377dc 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/S3ManagedLedgerOffloader.java
@@ -115,7 +115,8 @@ static String indexBlockOffloadKey(long ledgerId, UUID 
uuid) {
         CompletableFuture<Void> promise = new CompletableFuture<>();
         scheduler.chooseThread(readHandle.getId()).submit(() -> {
             OffloadIndexBlockBuilder indexBuilder = 
OffloadIndexBlockBuilder.create()
-                .withLedgerMetadata(readHandle.getLedgerMetadata());
+                .withLedgerMetadata(readHandle.getLedgerMetadata())
+                
.withDataBlockHeaderLength(BlockAwareSegmentInputStreamImpl.getHeaderSize());
             String dataBlockKey = dataBlockOffloadKey(readHandle.getId(), 
uuid);
             String indexBlockKey = indexBlockOffloadKey(readHandle.getId(), 
uuid);
             InitiateMultipartUploadRequest dataBlockReq = new 
InitiateMultipartUploadRequest(bucket, dataBlockKey, new ObjectMetadata());
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
index 03ae702d97..78d23802df 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/BlockAwareSegmentInputStreamImpl.java
@@ -205,6 +205,10 @@ public int getBlockEntryBytesCount() {
         return dataBlockFullOffset - DataBlockHeaderImpl.getDataStartOffset() 
- ENTRY_HEADER_SIZE * blockEntryCount;
     }
 
+    public static long getHeaderSize() {
+        return DataBlockHeaderImpl.getDataStartOffset();
+    }
+
     // Calculate the block size after uploaded `entryBytesAlreadyWritten` bytes
     public static int calculateBlockSize(int maxBlockSize, ReadHandle 
readHandle,
                                          long firstEntryToWrite, long 
entryBytesAlreadyWritten) {
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
index 6ea1f29e8b..19a644a363 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderImpl.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.broker.s3offload.impl;
 
+import com.google.common.io.CountingInputStream;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.PooledByteBufAllocator;
@@ -39,37 +41,38 @@
     // This is bigger than header size. Leaving some place for alignment and 
future enhancement.
     // Payload use this as the start offset.
     private static final int HEADER_MAX_SIZE = 128;
-    // The size of this header.
-    private static final int HEADER_SIZE = 4 /* magic word */
-        + 4 /* index block length */
-        + 8 /* first entry id */;
-    private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - 
HEADER_SIZE];
-
+    private static final int HEADER_BYTES_USED = 4 /* magic */
+                                               + 8 /* header len */
+                                               + 8 /* block len */
+                                               + 8 /* first entry id */;
+    private static final byte[] PADDING = new byte[HEADER_MAX_SIZE - 
HEADER_BYTES_USED];
 
     public static DataBlockHeaderImpl of(int blockLength, long firstEntryId) {
-        return new DataBlockHeaderImpl(blockLength, firstEntryId);
+        return new DataBlockHeaderImpl(HEADER_MAX_SIZE, blockLength, 
firstEntryId);
     }
 
     // Construct DataBlockHeader from InputStream, which contains 
`HEADER_MAX_SIZE` bytes readable.
     public static DataBlockHeader fromStream(InputStream stream) throws 
IOException {
-        DataInputStream dis = new DataInputStream(stream);
+        CountingInputStream countingStream = new CountingInputStream(stream);
+        DataInputStream dis = new DataInputStream(countingStream);
         int magic = dis.readInt();
         if (magic != MAGIC_WORD) {
             throw new IOException("Data block header magic word not match. 
read: " + magic + " expected: " + MAGIC_WORD);
         }
 
-        int blockLen = dis.readInt();
+        long headerLen = dis.readLong();
+        long blockLen = dis.readLong();
         long firstEntryId = dis.readLong();
-
-        // padding part
-        if (PADDING.length != dis.skipBytes(PADDING.length)) {
-            throw new EOFException("Data block header magic word not match.");
+        long toSkip = headerLen - countingStream.getCount();
+        if (dis.skip(toSkip) != toSkip) {
+            throw new EOFException("Header was too small");
         }
 
-        return new DataBlockHeaderImpl(blockLen, firstEntryId);
+        return new DataBlockHeaderImpl(headerLen, blockLen, firstEntryId);
     }
 
-    private final int blockLength;
+    private final long headerLength;
+    private final long blockLength;
     private final long firstEntryId;
 
     static public int getBlockMagicWord() {
@@ -81,21 +84,22 @@ static public int getDataStartOffset() {
     }
 
     @Override
-    public int getBlockLength() {
+    public long getBlockLength() {
         return this.blockLength;
     }
 
     @Override
-    public long getFirstEntryId() {
-        return this.firstEntryId;
+    public long getHeaderLength() {
+        return this.headerLength;
     }
 
     @Override
-    public int getHeaderSize() {
-        return HEADER_MAX_SIZE;
+    public long getFirstEntryId() {
+        return this.firstEntryId;
     }
 
-    public DataBlockHeaderImpl(int blockLength, long firstEntryId) {
+    public DataBlockHeaderImpl(long headerLength, long blockLength, long 
firstEntryId) {
+        this.headerLength = headerLength;
         this.blockLength = blockLength;
         this.firstEntryId = firstEntryId;
     }
@@ -107,13 +111,10 @@ public DataBlockHeaderImpl(int blockLength, long 
firstEntryId) {
      */
     @Override
     public InputStream toStream() {
-        int headerSize = 4 /* magic word */
-            + 4 /* index block length */
-            + 8 /* first entry id */;
-
         ByteBuf out = PooledByteBufAllocator.DEFAULT.buffer(HEADER_MAX_SIZE, 
HEADER_MAX_SIZE);
         out.writeInt(MAGIC_WORD)
-            .writeInt(blockLength)
+            .writeLong(headerLength)
+            .writeLong(blockLength)
             .writeLong(firstEntryId)
             .writeBytes(PADDING);
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
index 3b0f8995e2..1b03f009a0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockBuilderImpl.java
@@ -35,6 +35,7 @@
 
     private LedgerMetadata ledgerMetadata;
     private long dataObjectLength;
+    private long dataHeaderLength;
     private List<OffloadIndexEntryImpl> entries;
     private int lastBlockSize;
 
@@ -48,6 +49,12 @@ public OffloadIndexBlockBuilder withDataObjectLength(long 
dataObjectLength) {
         return this;
     }
 
+    @Override
+    public OffloadIndexBlockBuilder withDataBlockHeaderLength(long 
dataHeaderLength) {
+        this.dataHeaderLength = dataHeaderLength;
+        return this;
+    }
+
     @Override
     public OffloadIndexBlockBuilder withLedgerMetadata(LedgerMetadata 
metadata) {
         this.ledgerMetadata = metadata;
@@ -56,6 +63,8 @@ public OffloadIndexBlockBuilder 
withLedgerMetadata(LedgerMetadata metadata) {
 
     @Override
     public OffloadIndexBlockBuilder addBlock(long firstEntryId, int partId, 
int blockSize) {
+        checkState(dataHeaderLength > 0);
+
         // we should added one by one.
         long offset;
         if (firstEntryId == 0) {
@@ -67,7 +76,7 @@ public OffloadIndexBlockBuilder addBlock(long firstEntryId, 
int partId, int bloc
         }
         lastBlockSize = blockSize;
 
-        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, 
offset));
+        this.entries.add(OffloadIndexEntryImpl.of(firstEntryId, partId, 
offset, dataHeaderLength));
         return this;
     }
 
@@ -81,7 +90,8 @@ public OffloadIndexBlock build() {
         checkState(ledgerMetadata != null);
         checkState(!entries.isEmpty());
         checkState(dataObjectLength > 0);
-        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, 
entries);
+        checkState(dataHeaderLength > 0);
+        return OffloadIndexBlockImpl.get(ledgerMetadata, dataObjectLength, 
dataHeaderLength, entries);
     }
 
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
index db758d0785..d0e7239b42 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexBlockImpl.java
@@ -59,6 +59,7 @@
 
     private LedgerMetadata segmentMetadata;
     private long dataObjectLength;
+    private long dataHeaderLength;
     private TreeMap<Long, OffloadIndexEntryImpl> indexEntries;
 
     private final Handle<OffloadIndexBlockImpl> recyclerHandle;
@@ -75,6 +76,7 @@ private OffloadIndexBlockImpl(Handle<OffloadIndexBlockImpl> 
recyclerHandle) {
     }
 
     public static OffloadIndexBlockImpl get(LedgerMetadata metadata, long 
dataObjectLength,
+                                            long dataHeaderLength,
                                             List<OffloadIndexEntryImpl> 
entries) {
         OffloadIndexBlockImpl block = RECYCLER.get();
         block.indexEntries = Maps.newTreeMap();
@@ -82,6 +84,7 @@ public static OffloadIndexBlockImpl get(LedgerMetadata 
metadata, long dataObject
         checkState(entries.size() == block.indexEntries.size());
         block.segmentMetadata = metadata;
         block.dataObjectLength = dataObjectLength;
+        block.dataHeaderLength = dataHeaderLength;
         return block;
     }
 
@@ -94,6 +97,7 @@ public static OffloadIndexBlockImpl get(InputStream stream) 
throws IOException {
 
     public void recycle() {
         dataObjectLength = -1;
+        dataHeaderLength = -1;
         segmentMetadata = null;
         indexEntries.clear();
         indexEntries = null;
@@ -129,6 +133,11 @@ public long getDataObjectLength() {
         return this.dataObjectLength;
     }
 
+    @Override
+    public long getDataBlockHeaderLength() {
+        return this.dataHeaderLength;
+    }
+
     private static byte[] buildLedgerMetadataFormat(LedgerMetadata metadata) {
         LedgerMetadataFormat.Builder builder = 
LedgerMetadataFormat.newBuilder();
         builder.setQuorumSize(metadata.getWriteQuorumSize())
@@ -170,6 +179,7 @@ public long getDataObjectLength() {
         int indexBlockLength = 4 /* magic header */
             + 4 /* index block length */
             + 8 /* data object length */
+            + 8 /* data header length */
             + 4 /* segment metadata length */
             + 4 /* index entry count */
             + segmentMetadataLength
@@ -180,6 +190,7 @@ public long getDataObjectLength() {
         out.writeInt(INDEX_MAGIC_WORD)
             .writeInt(indexBlockLength)
             .writeLong(dataObjectLength)
+            .writeLong(dataHeaderLength)
             .writeInt(segmentMetadataLength)
             .writeInt(indexEntryCount);
 
@@ -319,6 +330,7 @@ private OffloadIndexBlock fromStream(InputStream stream) 
throws IOException {
         }
         int indexBlockLength = dis.readInt();
         this.dataObjectLength = dis.readLong();
+        this.dataHeaderLength = dis.readLong();
         int segmentMetadataLength = dis.readInt();
         int indexEntryCount = dis.readInt();
 
@@ -332,7 +344,8 @@ private OffloadIndexBlock fromStream(InputStream stream) 
throws IOException {
 
         for (int i = 0; i < indexEntryCount; i ++) {
             long entryId = dis.readLong();
-            this.indexEntries.putIfAbsent(entryId, 
OffloadIndexEntryImpl.of(entryId, dis.readInt(), dis.readLong()));
+            this.indexEntries.putIfAbsent(entryId, 
OffloadIndexEntryImpl.of(entryId, dis.readInt(),
+                                                                            
dis.readLong(), dataHeaderLength));
         }
 
         return this;
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
index d8d22675ea..b83de85e22 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexEntryImpl.java
@@ -26,15 +26,14 @@
  *
  */
 public class OffloadIndexEntryImpl implements OffloadIndexEntry {
-    public static OffloadIndexEntryImpl of(long entryId, int partId, long 
offset) {
-        return new OffloadIndexEntryImpl(entryId, partId, offset);
+    public static OffloadIndexEntryImpl of(long entryId, int partId, long 
offset, long blockHeaderSize) {
+        return new OffloadIndexEntryImpl(entryId, partId, offset, 
blockHeaderSize);
     }
 
     private final long entryId;
-
     private final int partId;
-
     private final long offset;
+    private final long blockHeaderSize;
 
     @Override
     public long getEntryId() {
@@ -50,13 +49,14 @@ public long getOffset() {
     }
     @Override
     public long getDataOffset() {
-        return offset + DataBlockHeaderImpl.getDataStartOffset();
+        return offset + blockHeaderSize;
     }
 
-    public OffloadIndexEntryImpl(long entryId, int partId, long offset) {
+    private OffloadIndexEntryImpl(long entryId, int partId, long offset, long 
blockHeaderSize) {
         this.entryId = entryId;
         this.partId = partId;
         this.offset = offset;
+        this.blockHeaderSize = blockHeaderSize;
     }
 }
 
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
index f3515ed858..a658032881 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/DataBlockHeaderTest.java
@@ -23,6 +23,7 @@
 import static org.testng.Assert.fail;
 
 import java.io.ByteArrayInputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import lombok.extern.slf4j.Slf4j;
@@ -34,22 +35,22 @@
 
     @Test
     public void dataBlockHeaderImplTest() throws Exception {
-        int headerLength = 1024 * 1024;
+        int blockLength = 1024 * 1024;
         long firstEntryId = 3333L;
 
-        DataBlockHeaderImpl dataBlockHeader = 
DataBlockHeaderImpl.of(headerLength,
+        DataBlockHeaderImpl dataBlockHeader = 
DataBlockHeaderImpl.of(blockLength,
             firstEntryId);
 
         // verify get methods
         assertEquals(dataBlockHeader.getBlockMagicWord(), 
DataBlockHeaderImpl.MAGIC_WORD);
-        assertEquals(dataBlockHeader.getBlockLength(), headerLength);
+        assertEquals(dataBlockHeader.getBlockLength(), blockLength);
         assertEquals(dataBlockHeader.getFirstEntryId(), firstEntryId);
 
         // verify toStream and fromStream
         InputStream stream = dataBlockHeader.toStream();
         stream.mark(0);
         DataBlockHeader rebuild = DataBlockHeaderImpl.fromStream(stream);
-        assertEquals(rebuild.getBlockLength(), headerLength);
+        assertEquals(rebuild.getBlockLength(), blockLength);
         assertEquals(rebuild.getFirstEntryId(), firstEntryId);
         // verify InputStream reach end
         assertEquals(stream.read(), -1);
@@ -72,8 +73,8 @@ public void dataBlockHeaderImplTest() throws Exception {
                 new ByteArrayInputStream(streamContent, 0, 
DataBlockHeaderImpl.getDataStartOffset() - 1)) {
             DataBlockHeader rebuild3 = DataBlockHeaderImpl.fromStream(stream3);
             fail("Should throw EOFException");
-        } catch (Exception e) {
-            assertTrue(e instanceof java.io.EOFException);
+        } catch (EOFException e) {
+            // expected
         }
 
         stream.close();
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
index af9d6e4084..cff404582e 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/s3offload/impl/OffloadIndexTest.java
@@ -47,17 +47,19 @@
     @Test
     public void offloadIndexEntryImplTest() {
         // verify OffloadIndexEntryImpl builder
-        OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0);
-        OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234);
+        OffloadIndexEntryImpl entry1 = OffloadIndexEntryImpl.of(0, 2, 0, 20);
+        OffloadIndexEntryImpl entry2 = OffloadIndexEntryImpl.of(100, 3, 1234, 
20);
 
         // verify OffloadIndexEntryImpl get
         assertEquals(entry1.getEntryId(), 0L);
         assertEquals(entry1.getPartId(), 2);
         assertEquals(entry1.getOffset(), 0L);
+        assertEquals(entry1.getDataOffset(), 20L);
 
         assertEquals(entry2.getEntryId(), 100L);
         assertEquals(entry2.getPartId(), 3);
         assertEquals(entry2.getOffset(), 1234L);
+        assertEquals(entry2.getDataOffset(), 1254L);
     }
 
 
@@ -108,7 +110,7 @@ public void offloadIndexBlockImplTest() throws Exception {
         LedgerMetadata metadata = createLedgerMetadata();
         log.debug("created metadata: {}", metadata.toString());
 
-        blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1);
+        
blockBuilder.withLedgerMetadata(metadata).withDataObjectLength(1).withDataBlockHeaderLength(23455);
 
         blockBuilder.addBlock(0, 2, 64 * 1024 * 1024);
         blockBuilder.addBlock(1000, 3, 64 * 1024 * 1024);
@@ -162,6 +164,7 @@ public void offloadIndexBlockImplTest() throws Exception {
         int magic = wrapper.readInt();
         int indexBlockLength = wrapper.readInt();
         long dataObjectLength = wrapper.readLong();
+        long dataHeaderLength = wrapper.readLong();
         int segmentMetadataLength = wrapper.readInt();
         int indexEntryCount = wrapper.readInt();
 
@@ -170,25 +173,32 @@ public void offloadIndexBlockImplTest() throws Exception {
         assertEquals(indexBlockLength, readoutLen);
         assertEquals(indexEntryCount, 3);
         assertEquals(dataObjectLength, 1);
+        assertEquals(dataHeaderLength, 23455);
 
         wrapper.readBytes(segmentMetadataLength);
         log.debug("magic: {}, blockLength: {}, metadataLength: {}, indexCount: 
{}",
             magic, indexBlockLength, segmentMetadataLength, indexEntryCount);
 
         // verify entry
-        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(), wrapper.readLong());
-        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(), wrapper.readLong());
-        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(), wrapper.readLong());;
+        OffloadIndexEntry e1 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(),
+                                                        wrapper.readLong(), 
dataHeaderLength);
+        OffloadIndexEntry e2 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(),
+                                                        wrapper.readLong(), 
dataHeaderLength);
+        OffloadIndexEntry e3 = OffloadIndexEntryImpl.of(wrapper.readLong(), 
wrapper.readInt(),
+                                                        wrapper.readLong(), 
dataHeaderLength);;
 
         assertEquals(e1.getEntryId(),entry1.getEntryId());
         assertEquals(e1.getPartId(), entry1.getPartId());
         assertEquals(e1.getOffset(), entry1.getOffset());
+        assertEquals(e1.getDataOffset(), entry1.getDataOffset());
         assertEquals(e2.getEntryId(), entry2.getEntryId());
         assertEquals(e2.getPartId(), entry2.getPartId());
         assertEquals(e2.getOffset(), entry2.getOffset());
+        assertEquals(e2.getDataOffset(), entry2.getDataOffset());
         assertEquals(e3.getEntryId(), entry3.getEntryId());
         assertEquals(e3.getPartId(), entry3.getPartId());
         assertEquals(e3.getOffset(), entry3.getOffset());
+        assertEquals(e3.getDataOffset(), entry3.getDataOffset());
         wrapper.release();
 
         // verify build OffloadIndexBlock from InputStream


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to