>From Wail Alkowaileet <wael....@gmail.com>:

Wail Alkowaileet has submitted this change. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446 )

Change subject: [ASTERIXDB-3453][STO] Incompressible pages are not written as 
full pages
......................................................................

[ASTERIXDB-3453][STO] Incompressible pages are not written as full pages

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Incompressible pages must be written as full pages (i.e., as
page + header) entirely to ensure the position of the cloud
files are in-sync with local files.

Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Wail Alkowaileet <wael....@gmail.com>
Reviewed-by: Murtadha Hubail <mhub...@apache.org>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
M 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
M 
hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
7 files changed, 94 insertions(+), 16 deletions(-)

Approvals:
  Murtadha Hubail: Looks good to me, approved
  Wail Alkowaileet: Looks good to me, but someone else must approve
  Jenkins: Verified; Verified

Objections:
  Anon. E. Moose #1000171: Violations found




diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e8..033f135 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -210,10 +210,11 @@
     }

     @Override
-    public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws 
HyracksDataException {
+    public final int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException {
         ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
         int writtenBytes;
         try {
+            ensurePosition(fHandle, cloudWriter.position(), offset);
             writtenBytes = cloudWriter.write(data);
         } catch (HyracksDataException e) {
             cloudWriter.abort();
@@ -223,10 +224,11 @@
     }

     @Override
-    public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) 
throws HyracksDataException {
+    public final long cloudWrite(IFileHandle fHandle, long offset, 
ByteBuffer[] data) throws HyracksDataException {
         ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
         int writtenBytes;
         try {
+            ensurePosition(fHandle, cloudWriter.position(), offset);
             writtenBytes = cloudWriter.write(data[0], data[1]);
         } catch (HyracksDataException e) {
             cloudWriter.abort();
@@ -265,18 +267,33 @@
     @Override
     public final long doSyncWrite(IFileHandle fHandle, long offset, 
ByteBuffer[] dataArray)
             throws HyracksDataException {
+        // Save original position and limit
+        ByteBuffer buffer1 = dataArray[0];
+        int position1 = buffer1.position();
+
+        ByteBuffer buffer2 = dataArray[1];
+        int position2 = buffer2.position();
+
         long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, 
dataArray);
-        dataArray[0].flip();
-        dataArray[1].flip();
-        cloudWrite(fHandle, dataArray);
+
+        // Restore original position
+        buffer1.position(position1);
+        buffer2.position(position2);
+
+        cloudWrite(fHandle, offset, dataArray);
         return writtenBytes;
     }

     @Override
     public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException {
+        // Save original position and limit
+        int position = data.position();
+
         int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
-        data.flip();
-        cloudWrite(fHandle, data);
+
+        // Restore original position
+        data.position(position);
+        cloudWrite(fHandle, offset, data);
         return writtenBytes;
     }

@@ -390,4 +407,11 @@
             performBulkOperation(deleteBulkOperation);
         }
     }
+
+    private void ensurePosition(IFileHandle fileHandle, long cloudOffset, long 
requestedWriteOffset) {
+        if (cloudOffset != requestedWriteOffset) {
+            throw new IllegalStateException("Misaligned positions in " + 
fileHandle.getFileReference()
+                    + ", cloudOffset: " + cloudOffset + " != 
requestedWriteOffset: " + requestedWriteOffset);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898..a233ca5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@
     private final IWriteBufferProvider bufferProvider;
     private final ICloudBufferedWriter bufferedWriter;
     private ByteBuffer writeBuffer;
+    private long writtenBytes;

     public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, 
IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
         this.bufferProvider = bufferProvider;
+        writtenBytes = 0;
     }
 
     /* ************************************************************
@@ -75,7 +77,7 @@
     @Override
     public int write(ByteBuffer page) throws HyracksDataException {
         open();
-        return write(page.array(), 0, page.limit());
+        return write(page.array(), page.position(), page.remaining());
     }

     @Override
@@ -84,6 +86,7 @@
             uploadAndWait();
         }
         writeBuffer.put((byte) b);
+        writtenBytes += 1;
     }

     @Override
@@ -102,7 +105,7 @@
             // enough to write all
             if (writeBuffer.remaining() > pageRemaining) {
                 writeBuffer.put(b, offset, pageRemaining);
-                return len;
+                break;
             }

             int remaining = writeBuffer.remaining();
@@ -112,10 +115,16 @@
             uploadAndWait();
         }

+        writtenBytes += len;
         return len;
     }

     @Override
+    public long position() {
+        return writtenBytes;
+    }
+
+    @Override
     public int read(byte[] b, int off, int len) throws IOException {
         if (writeBuffer.remaining() == 0) {
             return -1;
@@ -173,6 +182,7 @@
         if (writeBuffer == null) {
             writeBuffer = bufferProvider.getBuffer();
             writeBuffer.clear();
+            writtenBytes = 0;
         }
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4..920be9c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -61,6 +61,11 @@
     int write(byte[] b, int off, int len) throws HyracksDataException;

     /**
+     * @return the current position of the writer
+     */
+    long position();
+
+    /**
      * Finish the write operation
      * Note: this should be called upon successful write
      */
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5..d9119a5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@
     private final IRequestProfiler profiler;
     private final Storage gcsClient;
     private WriteChannel writer = null;
+    private long writtenBytes;

     public GCSWriter(String bucket, String path, Storage gcsClient, 
IRequestProfiler profiler) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
         this.gcsClient = gcsClient;
+        writtenBytes = 0;
     }

     @Override
@@ -67,17 +69,26 @@
             throw HyracksDataException.create(e);
         }

+        writtenBytes += written;
         return written;
     }

     @Override
     public int write(byte[] b, int off, int len) throws HyracksDataException {
-        return write(ByteBuffer.wrap(b, off, len));
+        int written = write(ByteBuffer.wrap(b, off, len));
+        writtenBytes += written;
+        return written;
+    }
+
+    @Override
+    public long position() {
+        return writtenBytes;
     }

     @Override
     public void write(int b) throws HyracksDataException {
         write(ByteBuffer.wrap(new byte[] { (byte) b }));
+        writtenBytes += 1;
     }

     @Override
@@ -105,6 +116,7 @@
         if (writer == null) {
             writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, 
path)).build());
             writer.setChunkSize(WRITE_BUFFER_SIZE);
+            writtenBytes = 0;
             log("STARTED");
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
index c75e83a..69d33e8 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java
@@ -36,13 +36,13 @@
     public int write(IOManager ioManager, IFileHandle handle, long offset, 
ByteBuffer data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, data);
+        return cloudIOManager.cloudWrite(handle, offset, data);
     }

     @Override
     public long write(IOManager ioManager, IFileHandle handle, long offset, 
ByteBuffer[] data)
             throws HyracksDataException {
         ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager;
-        return cloudIOManager.cloudWrite(handle, data);
+        return cloudIOManager.cloudWrite(handle, offset, data);
     }
 }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
index ab57139..9b6a2ff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java
@@ -74,19 +74,21 @@
      * Write to cloud only
      *
      * @param fHandle file handle
+     * @param offset  position to write from
      * @param data    to write
      * @return number of written bytes
      */
-    int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws 
HyracksDataException;
+    int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws 
HyracksDataException;

     /**
      * Write to cloud only
      *
      * @param fHandle file handle
+     * @param offset  position to write from
      * @param data    to write
      * @return number of written bytes
      */
-    long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws 
HyracksDataException;
+    long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) 
throws HyracksDataException;

     /**
      * Punch a hole in a file
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27..6bc85ff 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@
                 expectedBytesWritten = cBuffer.limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
cBuffer);
             } else {
-                //Compression did not gain any savings
+                // Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
+                // Incompressible pages should be written entirely
+                fixBufferPointers(buffers[1], 0);
                 offset = compressedFileManager.writePageInfo(pageId, 
bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) 
buffers[1].limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
buffers);
@@ -152,7 +154,7 @@
         long bytesWritten = 0;
         for (int i = 1; i < totalPages; i++) {
             fixBufferPointers(uBuffer, i);
-            cBuffer.position(0);
+            cBuffer.clear();

             final ByteBuffer writeBuffer;
             if (compressToWriteBuffer(uBuffer, cBuffer) < 
bufferCache.getPageSize()) {

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Gerrit-Change-Number: 18446
Gerrit-PatchSet: 4
Gerrit-Owner: Wail Alkowaileet <wael....@gmail.com>
Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com>
Gerrit-Reviewer: Anon. E. Moose #1000171
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Wail Alkowaileet <wael....@gmail.com>
Gerrit-MessageType: merged

Reply via email to