>From Wail Alkowaileet <wael....@gmail.com>: Wail Alkowaileet has submitted this change. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446 )
Change subject: [ASTERIXDB-3453][STO] Incompressible pages are not written as full pages ...................................................................... [ASTERIXDB-3453][STO] Incompressible pages are not written as full pages - user model changes: no - storage format changes: no - interface changes: yes Details: Incompressible pages must be written as full pages (i.e., as page + header) entirely to ensure the position of the cloud files are in-sync with local files. Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446 Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Wail Alkowaileet <wael....@gmail.com> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java M hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java M hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java 7 files changed, 94 insertions(+), 16 deletions(-) Approvals: Murtadha Hubail: Looks good to me, approved Wail Alkowaileet: Looks good to me, but someone else must approve Jenkins: Verified; Verified Objections: Anon. E. Moose #1000171: Violations found diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 91c24e8..033f135 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -210,10 +210,11 @@ } @Override - public final int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException { + public final int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter(); int writtenBytes; try { + ensurePosition(fHandle, cloudWriter.position(), offset); writtenBytes = cloudWriter.write(data); } catch (HyracksDataException e) { cloudWriter.abort(); @@ -223,10 +224,11 @@ } @Override - public final long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException { + public final long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException { ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter(); int writtenBytes; try { + ensurePosition(fHandle, cloudWriter.position(), offset); writtenBytes = cloudWriter.write(data[0], data[1]); } catch (HyracksDataException e) { cloudWriter.abort(); @@ -265,18 +267,33 @@ @Override public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { + // Save original position and limit + ByteBuffer buffer1 = dataArray[0]; + int position1 = buffer1.position(); + + ByteBuffer buffer2 = dataArray[1]; + int position2 = buffer2.position(); + long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray); - dataArray[0].flip(); - dataArray[1].flip(); - cloudWrite(fHandle, dataArray); + + // Restore original position + buffer1.position(position1); + buffer2.position(position2); + + cloudWrite(fHandle, offset, dataArray); return writtenBytes; } @Override public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException { + // Save original position and limit + int position = data.position(); + int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data); - data.flip(); - cloudWrite(fHandle, data); + + // Restore original position + data.position(position); + cloudWrite(fHandle, offset, data); return writtenBytes; } @@ -390,4 +407,11 @@ performBulkOperation(deleteBulkOperation); } } + + private void ensurePosition(IFileHandle fileHandle, long cloudOffset, long requestedWriteOffset) { + if (cloudOffset != requestedWriteOffset) { + throw new IllegalStateException("Misaligned positions in " + fileHandle.getFileReference() + + ", cloudOffset: " + cloudOffset + " != requestedWriteOffset: " + requestedWriteOffset); + } + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java index cace898..a233ca5 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java @@ -36,10 +36,12 @@ private final IWriteBufferProvider bufferProvider; private final ICloudBufferedWriter bufferedWriter; private ByteBuffer writeBuffer; + private long writtenBytes; public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, IWriteBufferProvider bufferProvider) { this.bufferedWriter = bufferedWriter; this.bufferProvider = bufferProvider; + writtenBytes = 0; } /* ************************************************************ @@ -75,7 +77,7 @@ @Override public int write(ByteBuffer page) throws HyracksDataException { open(); - return write(page.array(), 0, page.limit()); + return write(page.array(), page.position(), page.remaining()); } @Override @@ -84,6 +86,7 @@ uploadAndWait(); } writeBuffer.put((byte) b); + writtenBytes += 1; } @Override @@ -102,7 +105,7 @@ // enough to write all if (writeBuffer.remaining() > pageRemaining) { writeBuffer.put(b, offset, pageRemaining); - return len; + break; } int remaining = writeBuffer.remaining(); @@ -112,10 +115,16 @@ uploadAndWait(); } + writtenBytes += len; return len; } @Override + public long position() { + return writtenBytes; + } + + @Override public int read(byte[] b, int off, int len) throws IOException { if (writeBuffer.remaining() == 0) { return -1; @@ -173,6 +182,7 @@ if (writeBuffer == null) { writeBuffer = bufferProvider.getBuffer(); writeBuffer.clear(); + writtenBytes = 0; } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java index 15822c4..920be9c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java @@ -61,6 +61,11 @@ int write(byte[] b, int off, int len) throws HyracksDataException; /** + * @return the current position of the writer + */ + long position(); + + /** * Finish the write operation * Note: this should be called upon successful write */ diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java index a6dade5..d9119a5 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java @@ -41,12 +41,14 @@ private final IRequestProfiler profiler; private final Storage gcsClient; private WriteChannel writer = null; + private long writtenBytes; public GCSWriter(String bucket, String path, Storage gcsClient, IRequestProfiler profiler) { this.bucket = bucket; this.path = path; this.profiler = profiler; this.gcsClient = gcsClient; + writtenBytes = 0; } @Override @@ -67,17 +69,26 @@ throw HyracksDataException.create(e); } + writtenBytes += written; return written; } @Override public int write(byte[] b, int off, int len) throws HyracksDataException { - return write(ByteBuffer.wrap(b, off, len)); + int written = write(ByteBuffer.wrap(b, off, len)); + writtenBytes += written; + return written; + } + + @Override + public long position() { + return writtenBytes; } @Override public void write(int b) throws HyracksDataException { write(ByteBuffer.wrap(new byte[] { (byte) b })); + writtenBytes += 1; } @Override @@ -105,6 +116,7 @@ if (writer == null) { writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, path)).build()); writer.setChunkSize(WRITE_BUFFER_SIZE); + writtenBytes = 0; log("STARTED"); } } diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java index c75e83a..69d33e8 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/buffercache/context/DefaultCloudOnlyWriteContext.java @@ -36,13 +36,13 @@ public int write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer data) throws HyracksDataException { ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager; - return cloudIOManager.cloudWrite(handle, data); + return cloudIOManager.cloudWrite(handle, offset, data); } @Override public long write(IOManager ioManager, IFileHandle handle, long offset, ByteBuffer[] data) throws HyracksDataException { ICloudIOManager cloudIOManager = (ICloudIOManager) ioManager; - return cloudIOManager.cloudWrite(handle, data); + return cloudIOManager.cloudWrite(handle, offset, data); } } diff --git a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java index ab57139..9b6a2ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java +++ b/hyracks-fullstack/hyracks/hyracks-cloud/src/main/java/org/apache/hyracks/cloud/io/ICloudIOManager.java @@ -74,19 +74,21 @@ * Write to cloud only * * @param fHandle file handle + * @param offset position to write from * @param data to write * @return number of written bytes */ - int cloudWrite(IFileHandle fHandle, ByteBuffer data) throws HyracksDataException; + int cloudWrite(IFileHandle fHandle, long offset, ByteBuffer data) throws HyracksDataException; /** * Write to cloud only * * @param fHandle file handle + * @param offset position to write from * @param data to write * @return number of written bytes */ - long cloudWrite(IFileHandle fHandle, ByteBuffer[] data) throws HyracksDataException; + long cloudWrite(IFileHandle fHandle, long offset, ByteBuffer[] data) throws HyracksDataException; /** * Punch a hole in a file diff --git a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java index 6ad4d27..6bc85ff 100644 --- a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java +++ b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java @@ -123,8 +123,10 @@ expectedBytesWritten = cBuffer.limit(); bytesWritten = context.write(ioManager, handle, offset, cBuffer); } else { - //Compression did not gain any savings + // Compression did not gain any savings final ByteBuffer[] buffers = header.prepareWrite(cPage); + // Incompressible pages should be written entirely + fixBufferPointers(buffers[1], 0); offset = compressedFileManager.writePageInfo(pageId, bufferCache.getPageSizeWithHeader()); expectedBytesWritten = buffers[0].limit() + (long) buffers[1].limit(); bytesWritten = context.write(ioManager, handle, offset, buffers); @@ -152,7 +154,7 @@ long bytesWritten = 0; for (int i = 1; i < totalPages; i++) { fixBufferPointers(uBuffer, i); - cBuffer.position(0); + cBuffer.clear(); final ByteBuffer writeBuffer; if (compressToWriteBuffer(uBuffer, cBuffer) < bufferCache.getPageSize()) { -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6 Gerrit-Change-Number: 18446 Gerrit-PatchSet: 4 Gerrit-Owner: Wail Alkowaileet <wael....@gmail.com> Gerrit-Reviewer: Ali Alsuliman <ali.al.solai...@gmail.com> Gerrit-Reviewer: Anon. E. Moose #1000171 Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Wail Alkowaileet <wael....@gmail.com> Gerrit-MessageType: merged