>From Wail Alkowaileet <wael....@gmail.com>: Wail Alkowaileet has uploaded this change for review. ( https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18259 )
Change subject: [WIP] Refactor Cloud writer ...................................................................... [WIP] Refactor Cloud writer Change-Id: I52124696b50d6dcc1f3c65b7e0fe251df1579ac5 --- M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java R asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudResettableInputStream.java A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java R asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/S3BufferedWriter.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java R asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriterSingleBufferProvider.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java R asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/IWriteBufferProvider.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java R asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriteBufferProvider.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java M asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java M asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java A asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/NoOpCloudWriter.java 22 files changed, 307 insertions(+), 118 deletions(-) git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb refs/changes/59/18259/1 diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java index 368be26..966e311 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java @@ -34,6 +34,10 @@ import org.apache.asterix.cloud.bulk.NoOpDeleteBulkCallBack; import org.apache.asterix.cloud.clients.CloudClientProvider; import org.apache.asterix.cloud.clients.ICloudClient; +import org.apache.asterix.cloud.clients.ICloudWriter; +import org.apache.asterix.cloud.clients.NoOpCloudWriter; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; +import org.apache.asterix.cloud.clients.aws.s3.write.WriteBufferProvider; import org.apache.asterix.cloud.util.CloudFileUtil; import org.apache.asterix.common.api.INamespacePathResolver; import org.apache.asterix.common.cloud.IPartitionBootstrapper; @@ -71,7 +75,7 @@ this.bucket = cloudProperties.getStorageBucket(); cloudClient = CloudClientProvider.getClient(cloudProperties); int numOfThreads = getIODevices().size() * getIOParallelism(); - writeBufferProvider = new WriteBufferProvider(numOfThreads); + writeBufferProvider = new WriteBufferProvider(numOfThreads, cloudClient.getWriteBufferSize()); partitions = new HashSet<>(); partitionPaths = new ArrayList<>(); this.localIoManager = ioManager; @@ -165,8 +169,10 @@ @Override public final IFileHandle open(FileReference fileRef, FileReadWriteMode rwMode, FileSyncMode syncMode) throws HyracksDataException { - CloudFileHandle fHandle = new CloudFileHandle(cloudClient, bucket, fileRef, writeBufferProvider); - onOpen(fHandle, rwMode, syncMode); + ICloudWriter cloudWriter = exists(fileRef) ? NoOpCloudWriter.INSTANCE + : cloudClient.createdWriter(bucket, fileRef.getRelativePath(), writeBufferProvider); + CloudFileHandle fHandle = new CloudFileHandle(fileRef, cloudWriter); + onOpen(fHandle); try { fHandle.open(rwMode, syncMode); } catch (IOException e) { @@ -180,18 +186,17 @@ * * @param fileHandle file to open */ - protected abstract void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode) - throws HyracksDataException; + protected abstract void onOpen(CloudFileHandle fileHandle) throws HyracksDataException; @Override public final long doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer[] dataArray) throws HyracksDataException { long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray); - CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream(); + ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter(); try { - inputStream.write(dataArray[0], dataArray[1]); + cloudWriter.write(dataArray[0], dataArray[1]); } catch (HyracksDataException e) { - inputStream.abort(); + cloudWriter.abort(); throw e; } return writtenBytes; @@ -200,11 +205,11 @@ @Override public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer dataArray) throws HyracksDataException { int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, dataArray); - CloudResettableInputStream inputStream = ((CloudFileHandle) fHandle).getInputStream(); + ICloudWriter cloudWriter = ((CloudFileHandle) fHandle).getCloudWriter(); try { - inputStream.write(dataArray); + cloudWriter.write(dataArray); } catch (HyracksDataException e) { - inputStream.abort(); + cloudWriter.abort(); throw e; } return writtenBytes; @@ -231,16 +236,16 @@ if (metadata) { // only finish writing if metadata == true to prevent write limiter from finishing the stream and // completing the upload. - CloudResettableInputStream stream = ((CloudFileHandle) fileHandle).getInputStream(); + ICloudWriter cloudWriter = ((CloudFileHandle) fileHandle).getCloudWriter(); try { - stream.finish(); + cloudWriter.finish(); } catch (HyracksDataException e) { savedEx = e; } if (savedEx != null) { try { - stream.abort(); + cloudWriter.abort(); } catch (HyracksDataException e) { savedEx.addSuppressed(e); } @@ -286,7 +291,7 @@ /** * Writes the bytes to the specified key in the bucket * - * @param key the key where the bytes will be written + * @param key the key where the bytes will be written * @param bytes the bytes to write */ public final void put(String key, byte[] bytes) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java index 14c44ad..2ba272a 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudFileHandle.java @@ -20,20 +20,17 @@ import java.io.IOException; -import org.apache.asterix.cloud.clients.ICloudBufferedWriter; -import org.apache.asterix.cloud.clients.ICloudClient; +import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.control.nc.io.FileHandle; public class CloudFileHandle extends FileHandle { - private final CloudResettableInputStream inputStream; + private final ICloudWriter cloudWriter; - public CloudFileHandle(ICloudClient cloudClient, String bucket, FileReference fileRef, - IWriteBufferProvider bufferProvider) { + public CloudFileHandle(FileReference fileRef, ICloudWriter cloudWriter) { super(fileRef); - ICloudBufferedWriter bufferedWriter = cloudClient.createBufferedWriter(bucket, fileRef.getRelativePath()); - inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider); + this.cloudWriter = cloudWriter; } @Override @@ -45,11 +42,11 @@ @Override public synchronized void close() throws IOException { - inputStream.close(); + cloudWriter.close(); super.close(); } - public CloudResettableInputStream getInputStream() { - return inputStream; + public ICloudWriter getCloudWriter() { + return cloudWriter; } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java index 349b1b1..bea91fb 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudOutputStream.java @@ -21,29 +21,31 @@ import java.io.IOException; import java.io.OutputStream; -public final class CloudOutputStream extends OutputStream { - private final CloudResettableInputStream inputStream; +import org.apache.asterix.cloud.clients.ICloudWriter; - public CloudOutputStream(CloudResettableInputStream inputStream) { - this.inputStream = inputStream; +public final class CloudOutputStream extends OutputStream { + private final ICloudWriter cloudWriter; + + public CloudOutputStream(ICloudWriter cloudWriter) { + this.cloudWriter = cloudWriter; } @Override public void write(byte[] b, int off, int len) throws IOException { - inputStream.write(b, off, len); + cloudWriter.write(b, off, len); } @Override public void write(int b) throws IOException { - inputStream.write(b); + cloudWriter.write(b); } @Override public void close() throws IOException { - inputStream.finish(); + cloudWriter.finish(); } public void abort() throws IOException { - inputStream.abort(); + cloudWriter.abort(); } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java index d0b982c..0b4200c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/EagerCloudIOManager.java @@ -65,7 +65,7 @@ } @Override - protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode) { + protected void onOpen(CloudFileHandle fileHandle) { // NoOp } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java index 6ecd201..cb47d00 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/LazyCloudIOManager.java @@ -136,9 +136,8 @@ } @Override - protected void onOpen(CloudFileHandle fileHandle, FileReadWriteMode rwMode, FileSyncMode syncMode) - throws HyracksDataException { - accessor.doOnOpen(fileHandle, rwMode, syncMode); + protected void onOpen(CloudFileHandle fileHandle) throws HyracksDataException { + accessor.doOnOpen(fileHandle); } /* diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java index 21450c4..8a83c2a 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudClientProvider.java @@ -20,9 +20,12 @@ import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig; import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; +import org.apache.asterix.cloud.clients.aws.s3.write.WriteBufferProvider; import org.apache.asterix.common.config.CloudProperties; public class CloudClientProvider { + private static final String S3 = "s3"; private CloudClientProvider() { throw new AssertionError("do not instantiate"); @@ -30,10 +33,14 @@ public static ICloudClient getClient(CloudProperties cloudProperties) { String storageScheme = cloudProperties.getStorageScheme(); - if ("s3".equalsIgnoreCase(storageScheme)) { + if (S3.equalsIgnoreCase(storageScheme)) { S3ClientConfig config = S3ClientConfig.of(cloudProperties); return new S3CloudClient(config); } throw new IllegalStateException("unsupported cloud storage scheme: " + storageScheme); } + + public static IWriteBufferProvider createBufferProvider(int writeBufferSize, int numOfThreads) { + return new WriteBufferProvider(writeBufferSize, numOfThreads); + } } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudResettableInputStream.java similarity index 84% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudResettableInputStream.java index 0533184..6232957 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/CloudResettableInputStream.java @@ -16,21 +16,19 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud; +package org.apache.asterix.cloud.clients; import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; -import org.apache.asterix.cloud.clients.ICloudBufferedWriter; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -public class CloudResettableInputStream extends InputStream { +public class CloudResettableInputStream extends InputStream implements ICloudWriter { private static final Logger LOGGER = LogManager.getLogger(); - // TODO: make configurable - public static final int MIN_BUFFER_SIZE = 5 * 1024 * 1024; private final IWriteBufferProvider bufferProvider; private ByteBuffer writeBuffer; @@ -41,12 +39,10 @@ this.bufferProvider = bufferProvider; } - private void open() { - if (writeBuffer == null) { - writeBuffer = bufferProvider.getBuffer(); - writeBuffer.clear(); - } - } + /* ************************************************************ + * InputStream methods + * ************************************************************ + */ @Override public void reset() { @@ -63,16 +59,25 @@ writeBuffer.mark(); } - public void write(ByteBuffer header, ByteBuffer page) throws HyracksDataException { - write(header); - write(page); + /* ************************************************************ + * ICloudWriter methods + * ************************************************************ + */ + + @Override + public int write(ByteBuffer header, ByteBuffer page) throws HyracksDataException { + int writtenBytes = write(header); + writtenBytes += write(page); + return writtenBytes; } + @Override public int write(ByteBuffer page) throws HyracksDataException { open(); return write(page.array(), 0, page.limit()); } + @Override public void write(int b) throws HyracksDataException { if (writeBuffer.remaining() == 0) { uploadAndWait(); @@ -80,6 +85,7 @@ writeBuffer.put((byte) b); } + @Override public int write(byte[] b, int off, int len) throws HyracksDataException { open(); @@ -108,6 +114,7 @@ return len; } + @Override public void finish() throws HyracksDataException { open(); try { @@ -126,6 +133,7 @@ } } + @Override public void abort() throws HyracksDataException { try { bufferedWriter.abort(); @@ -162,6 +170,13 @@ return writeBuffer.get(); } + private void open() { + if (writeBuffer == null) { + writeBuffer = bufferProvider.getBuffer(); + writeBuffer.clear(); + } + } + private void returnBuffer() { if (writeBuffer != null) { bufferProvider.recycle(writeBuffer); diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java index 7941ada..f9c43f8 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudClient.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Set; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; import org.apache.hyracks.control.nc.io.IOManager; @@ -35,15 +36,20 @@ * Interface containing methods to perform IO operation on the Cloud Storage */ public interface ICloudClient { + /** + * @return write buffer size + */ + int getWriteBufferSize(); /** * Creates a cloud buffered writer * - * @param bucket bucket to write to - * @param path path to write to - * @return buffered writer + * @param bucket bucket to write to + * @param path path to write to + * @param bufferProvider buffer provider + * @return cloud writer */ - ICloudBufferedWriter createBufferedWriter(String bucket, String path); + ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider); /** * Lists objects at the specified bucket and path, and applies the file name filter on the returned objects diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java new file mode 100644 index 0000000..289c496 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +/** + * A cloud-based writer that write bytes sequentially in a cloud blob storage + */ +public interface ICloudWriter { + /** + * Write a header and a page + * + * @param header to write + * @param page to write + * @return written bytes + */ + int write(ByteBuffer header, ByteBuffer page) throws HyracksDataException; + + /** + * Write a page + * + * @param page to write + * @return written bytes + */ + int write(ByteBuffer page) throws HyracksDataException; + + /** + * Write a byte + * + * @param b to write + */ + void write(int b) throws HyracksDataException; + + /** + * Write a byte array + * + * @param b bytes to write + * @param off starting offset + * @param len length to write + * @return written bytes + */ + int write(byte[] b, int off, int len) throws HyracksDataException; + + /** + * Finish the write operation + * Note: this should be called upon successful write + */ + void finish() throws HyracksDataException; + + /** + * Abort the write operation + * Note: should be called instead of {@link #finish()} when the write operation encountered an error + */ + void abort() throws HyracksDataException; + + /** + * Close and release resources + * + * @throws IOException instead of {@link HyracksDataException} to conform with {@link InputStream#close()} + */ + void close() throws IOException; +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/NoOpCloudWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/NoOpCloudWriter.java new file mode 100644 index 0000000..623a9d0 --- /dev/null +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/NoOpCloudWriter.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.cloud.clients; + +import java.nio.ByteBuffer; + +import org.apache.hyracks.api.exceptions.HyracksDataException; + +public final class NoOpCloudWriter implements ICloudWriter { + public static final ICloudWriter INSTANCE = new NoOpCloudWriter(); + private static final String NOT_WRITEABLE_ERR_MSG = "Cannot overwrite an existing file"; + + private NoOpCloudWriter() { + } + + @Override + public int write(ByteBuffer header, ByteBuffer page) throws HyracksDataException { + throw new IllegalStateException(NOT_WRITEABLE_ERR_MSG); + } + + @Override + public int write(ByteBuffer page) throws HyracksDataException { + throw new IllegalStateException(NOT_WRITEABLE_ERR_MSG); + } + + @Override + public void write(int b) throws HyracksDataException { + throw new IllegalStateException(NOT_WRITEABLE_ERR_MSG); + } + + @Override + public int write(byte[] b, int off, int len) throws HyracksDataException { + throw new IllegalStateException(NOT_WRITEABLE_ERR_MSG); + } + + @Override + public void finish() throws HyracksDataException { + throw new IllegalStateException(NOT_WRITEABLE_ERR_MSG); + } + + @Override + public void abort() throws HyracksDataException { + throw new IllegalStateException(NOT_WRITEABLE_ERR_MSG); + } + + @Override + public void close() { + // NoOp + } +} diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java index bc13078..c6894ce 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3ClientConfig.java @@ -22,12 +22,14 @@ import org.apache.asterix.common.config.CloudProperties; import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.hyracks.util.StorageUtil; import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; public final class S3ClientConfig { + public static final int MIN_BUFFER_SIZE = StorageUtil.getIntSizeInBytes(5, StorageUtil.StorageUnit.MEGABYTE); // The maximum number of file that can be deleted (AWS restriction) static final int DELETE_BATCH_SIZE = 1000; private final String region; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java index 5cdf971..9cd993c 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3CloudClient.java @@ -35,9 +35,13 @@ import java.util.List; import java.util.Set; +import org.apache.asterix.cloud.clients.CloudResettableInputStream; import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudClient; +import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.asterix.cloud.clients.IParallelDownloader; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; +import org.apache.asterix.cloud.clients.aws.s3.write.S3BufferedWriter; import org.apache.asterix.cloud.clients.profiler.CountRequestProfiler; import org.apache.asterix.cloud.clients.profiler.IRequestProfiler; import org.apache.asterix.cloud.clients.profiler.NoOpRequestProfiler; @@ -69,7 +73,7 @@ import software.amazon.awssdk.services.s3.model.S3Object; @ThreadSafe -public class S3CloudClient implements ICloudClient { +public final class S3CloudClient implements ICloudClient { private final S3ClientConfig config; private final S3Client s3Client; private final IRequestProfiler profiler; @@ -90,8 +94,14 @@ } @Override - public ICloudBufferedWriter createBufferedWriter(String bucket, String path) { - return new S3BufferedWriter(s3Client, profiler, bucket, path); + public int getWriteBufferSize() { + return S3ClientConfig.MIN_BUFFER_SIZE; + } + + @Override + public ICloudWriter createdWriter(String bucket, String path, IWriteBufferProvider bufferProvider) { + ICloudBufferedWriter bufferedWriter = new S3BufferedWriter(s3Client, profiler, bucket, path); + return new CloudResettableInputStream(bufferedWriter, bufferProvider); } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/IWriteBufferProvider.java similarity index 94% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/IWriteBufferProvider.java index 693b73a..b61de23 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/IWriteBufferProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/IWriteBufferProvider.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud; +package org.apache.asterix.cloud.clients.aws.s3.write; import java.nio.ByteBuffer; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/S3BufferedWriter.java similarity index 98% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/S3BufferedWriter.java index a6579c2..59e7293 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/S3BufferedWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/S3BufferedWriter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud.clients.aws.s3; +package org.apache.asterix.cloud.clients.aws.s3.write; import java.io.InputStream; import java.util.ArrayList; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriteBufferProvider.java similarity index 77% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriteBufferProvider.java index ee17400..5b994ea 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriteBufferProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriteBufferProvider.java @@ -16,9 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud; - -import static org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE; +package org.apache.asterix.cloud.clients.aws.s3.write; import java.nio.ByteBuffer; import java.util.concurrent.ArrayBlockingQueue; @@ -27,11 +25,13 @@ import org.apache.hyracks.util.annotations.ThreadSafe; @ThreadSafe -public class WriteBufferProvider implements IWriteBufferProvider { +public final class WriteBufferProvider implements IWriteBufferProvider { + private final int bufferSize; private final BlockingQueue<ByteBuffer> writeBuffers; - public WriteBufferProvider(int ioParallelism) { - writeBuffers = new ArrayBlockingQueue<>(ioParallelism); + public WriteBufferProvider(int bufferSize, int numberOfBuffers) { + this.bufferSize = bufferSize; + writeBuffers = new ArrayBlockingQueue<>(numberOfBuffers); } @Override @@ -43,7 +43,7 @@ public ByteBuffer getBuffer() { ByteBuffer writeBuffer = writeBuffers.poll(); if (writeBuffer == null) { - return ByteBuffer.allocate(MIN_BUFFER_SIZE); + return ByteBuffer.allocate(bufferSize); } return writeBuffer; } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriterSingleBufferProvider.java similarity index 79% rename from asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java rename to asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriterSingleBufferProvider.java index 287900d..8d388d9 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/WriterSingleBufferProvider.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/aws/s3/write/WriterSingleBufferProvider.java @@ -16,21 +16,18 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.asterix.cloud; - -import static org.apache.asterix.cloud.CloudResettableInputStream.MIN_BUFFER_SIZE; +package org.apache.asterix.cloud.clients.aws.s3.write; import java.nio.ByteBuffer; import org.apache.hyracks.util.annotations.NotThreadSafe; @NotThreadSafe -public class WriterSingleBufferProvider implements IWriteBufferProvider { - +public final class WriterSingleBufferProvider implements IWriteBufferProvider { private final ByteBuffer buffer; - public WriterSingleBufferProvider() { - buffer = ByteBuffer.allocate(MIN_BUFFER_SIZE); + public WriterSingleBufferProvider(int size) { + buffer = ByteBuffer.allocate(size); } @Override diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java index 8f803a0..534ff5d 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ILazyAccessor.java @@ -25,15 +25,13 @@ import org.apache.asterix.cloud.bulk.IBulkOperationCallBack; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IIOManager; public interface ILazyAccessor { boolean isLocalAccessor(); IBulkOperationCallBack getBulkOperationCallBack(); - void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode, IIOManager.FileSyncMode syncMode) - throws HyracksDataException; + void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException; Set<FileReference> doList(FileReference dir, FilenameFilter filter) throws HyracksDataException; diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java index 378cf03..ae32402 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/LocalAccessor.java @@ -27,7 +27,6 @@ import org.apache.asterix.cloud.clients.ICloudClient; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.control.nc.io.IOManager; /** @@ -50,8 +49,7 @@ } @Override - public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode, - IIOManager.FileSyncMode syncMode) throws HyracksDataException { + public void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException { // NoOp } diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java index bfa353a..6e52d0b 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/lazy/accessor/ReplaceableCloudAccessor.java @@ -30,7 +30,6 @@ import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.io.FileReference; -import org.apache.hyracks.api.io.IIOManager; import org.apache.hyracks.control.nc.io.IOManager; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -70,8 +69,7 @@ } @Override - public void doOnOpen(CloudFileHandle fileHandle, IIOManager.FileReadWriteMode rwMode, - IIOManager.FileSyncMode syncMode) throws HyracksDataException { + public void doOnOpen(CloudFileHandle fileHandle) throws HyracksDataException { FileReference fileRef = fileHandle.getFileReference(); if (!localIoManager.exists(fileRef) && cloudClient.exists(bucket, fileRef.getRelativePath())) { if (cacher.downloadData(fileRef)) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java index bbae29a..4e799af 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/AbstractCloudExternalFileWriter.java @@ -21,11 +21,10 @@ import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import org.apache.asterix.cloud.CloudOutputStream; -import org.apache.asterix.cloud.CloudResettableInputStream; -import org.apache.asterix.cloud.IWriteBufferProvider; -import org.apache.asterix.cloud.WriterSingleBufferProvider; -import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudClient; +import org.apache.asterix.cloud.clients.ICloudWriter; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; +import org.apache.asterix.cloud.clients.aws.s3.write.WriterSingleBufferProvider; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.common.exceptions.RuntimeDataException; import org.apache.asterix.runtime.writer.IExternalFileWriter; @@ -46,7 +45,7 @@ private final IWarningCollector warningCollector; private final SourceLocation pathSourceLocation; private final IWriteBufferProvider bufferProvider; - private ICloudBufferedWriter bufferedWriter; + private ICloudWriter cloudWriter; AbstractCloudExternalFileWriter(IExternalPrinter printer, ICloudClient cloudClient, String bucket, boolean partitionedPath, IWarningCollector warningCollector, SourceLocation pathSourceLocation) { @@ -56,7 +55,7 @@ this.partitionedPath = partitionedPath; this.warningCollector = warningCollector; this.pathSourceLocation = pathSourceLocation; - bufferProvider = new WriterSingleBufferProvider(); + bufferProvider = new WriterSingleBufferProvider(cloudClient.getWriteBufferSize()); } @Override @@ -82,10 +81,8 @@ return false; } - bufferedWriter = cloudClient.createBufferedWriter(bucket, fullPath); - CloudResettableInputStream inputStream = new CloudResettableInputStream(bufferedWriter, bufferProvider); - - CloudOutputStream outputStream = new CloudOutputStream(inputStream); + cloudWriter = cloudClient.createdWriter(bucket, fullPath, bufferProvider); + CloudOutputStream outputStream = new CloudOutputStream(cloudWriter); printer.newStream(outputStream); return true; @@ -108,8 +105,8 @@ @Override public final void abort() throws HyracksDataException { try { - if (bufferedWriter != null) { - bufferedWriter.abort(); + if (cloudWriter != null) { + cloudWriter.abort(); } printer.close(); } catch (HyracksDataException e) { diff --git a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java index cdaa6dc..3c0300e 100644 --- a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java +++ b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/writer/S3ExternalFileWriterFactory.java @@ -26,12 +26,12 @@ import java.util.Map; import java.util.Random; -import org.apache.asterix.cloud.CloudResettableInputStream; -import org.apache.asterix.cloud.WriterSingleBufferProvider; -import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudClient; +import org.apache.asterix.cloud.clients.ICloudWriter; import org.apache.asterix.cloud.clients.aws.s3.S3ClientConfig; import org.apache.asterix.cloud.clients.aws.s3.S3CloudClient; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; +import org.apache.asterix.cloud.clients.aws.s3.write.WriterSingleBufferProvider; import org.apache.asterix.common.exceptions.CompilationException; import org.apache.asterix.common.exceptions.ErrorCode; import org.apache.asterix.external.util.ExternalDataConstants; @@ -168,19 +168,18 @@ long writeValue = random.nextLong(); byte[] data = new byte[Long.BYTES]; LongPointable.setLong(data, 0, writeValue); - ICloudBufferedWriter writer = testClient.createBufferedWriter(bucket, path); - CloudResettableInputStream stream = null; + IWriteBufferProvider bufferProvider = new WriterSingleBufferProvider(cloudClient.getWriteBufferSize()); + ICloudWriter writer = testClient.createdWriter(bucket, path, bufferProvider); boolean aborted = false; try { - stream = new CloudResettableInputStream(writer, new WriterSingleBufferProvider()); - stream.write(data, 0, data.length); + writer.write(data, 0, data.length); } catch (HyracksDataException e) { - stream.abort(); + writer.abort(); aborted = true; } finally { - if (stream != null && !aborted) { - stream.finish(); - stream.close(); + if (writer != null && !aborted) { + writer.finish(); + writer.close(); } } diff --git a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java index 92d7f12..c30cb13 100644 --- a/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java +++ b/asterixdb/asterix-cloud/src/test/java/org/apache/asterix/cloud/LSMTest.java @@ -23,8 +23,10 @@ import java.nio.ByteBuffer; import java.util.Collections; -import org.apache.asterix.cloud.clients.ICloudBufferedWriter; import org.apache.asterix.cloud.clients.ICloudClient; +import org.apache.asterix.cloud.clients.ICloudWriter; +import org.apache.asterix.cloud.clients.aws.s3.write.IWriteBufferProvider; +import org.apache.asterix.cloud.clients.aws.s3.write.WriterSingleBufferProvider; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.junit.FixMethodOrder; @@ -53,28 +55,27 @@ @Test public void a1writeToS3Test() throws IOException { - CloudResettableInputStream stream = null; + IWriteBufferProvider bufferProvider = new WriterSingleBufferProvider(CLOUD_CLIENT.getWriteBufferSize()); + ICloudWriter cloudWriter = + CLOUD_CLIENT.createdWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b", bufferProvider); try { - ICloudBufferedWriter s3BufferedWriter = - CLOUD_CLIENT.createBufferedWriter(PLAYGROUND_CONTAINER, BUCKET_STORAGE_ROOT + "/0_b"); - stream = new CloudResettableInputStream(s3BufferedWriter, new WriteBufferProvider(1)); ByteBuffer content = createContent(BUFFER_SIZE); int size = 0; for (int i = 0; i < 10; i++) { content.clear(); - size += stream.write(content); + size += cloudWriter.write(content); } - stream.finish(); + cloudWriter.finish(); System.err.println(size); } catch (Exception e) { e.printStackTrace(); - if (stream != null) { - stream.abort(); + if (cloudWriter != null) { + cloudWriter.abort(); } } finally { - if (stream != null) { - stream.close(); + if (cloudWriter != null) { + cloudWriter.close(); } } } -- To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18259 To unsubscribe, or for help writing mail filters, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Change-Id: I52124696b50d6dcc1f3c65b7e0fe251df1579ac5 Gerrit-Change-Number: 18259 Gerrit-PatchSet: 1 Gerrit-Owner: Wail Alkowaileet <wael....@gmail.com> Gerrit-MessageType: newchange