kunwp1 commented on code in PR #4067: URL: https://github.com/apache/texera/pull/4067#discussion_r2561157566
########## common/workflow-core/src/main/scala/org/apache/texera/service/util/BigObjectInputStream.scala: ########## @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.texera.service.util + +import org.apache.amber.core.tuple.BigObject + +import java.io.InputStream + +/** + * InputStream for reading BigObject data from S3. + * + * The underlying S3 download is lazily initialized on first read. + * The stream will fail if the S3 object doesn't exist when read is attempted. + * + * Usage: + * {{{ + * val bigObject: BigObject = ... + * try (val in = new BigObjectInputStream(bigObject)) { + * val bytes = in.readAllBytes() + * } + * }}} + */ +class BigObjectInputStream(bigObject: BigObject) extends InputStream { + + require(bigObject != null, "BigObject cannot be null") + + // Lazy initialization - downloads only when first read() is called + private lazy val underlying: InputStream = + S3StorageClient.downloadObject(bigObject.getBucketName, bigObject.getObjectKey) + + @volatile private var closed = false + + override def read(): Int = { + ensureOpen() + underlying.read() + } + + override def read(b: Array[Byte], off: Int, len: Int): Int = { + ensureOpen() + underlying.read(b, off, len) + } + + override def readAllBytes(): Array[Byte] = { + ensureOpen() + underlying.readAllBytes() + } + + override def readNBytes(n: Int): Array[Byte] = { + ensureOpen() + underlying.readNBytes(n) + } + + override def skip(n: Long): Long = { + ensureOpen() + underlying.skip(n) + } + + override def available(): Int = { + ensureOpen() + underlying.available() + } + + override def close(): Unit = { + if (!closed) { + closed = true + if (underlying != null) { // Only close if initialized + underlying.close() + } + } + } + + override def markSupported(): Boolean = { + ensureOpen() + underlying.markSupported() + } + + override def mark(readlimit: Int): Unit = { + ensureOpen() + underlying.mark(readlimit) + } + + override def reset(): Unit = { + ensureOpen() + underlying.reset() + } + + private def ensureOpen(): Unit = { + if (closed) throw new java.io.IOException("Stream is closed") + } Review Comment: Thanks! I changed both BigObjectInputStream and BigObjectOutputStream. ########## common/workflow-core/src/main/scala/org/apache/texera/service/util/S3StorageClient.scala: ########## @@ -139,4 +141,118 @@ object S3StorageClient { s3Client.deleteObjects(deleteObjectsRequest) } } + + /** + * Uploads an object to S3 using multipart upload. + * Handles streams of any size without loading into memory. + */ + def uploadObject(bucketName: String, objectKey: String, inputStream: InputStream): String = { + val buffer = new Array[Byte](MINIMUM_NUM_OF_MULTIPART_S3_PART.toInt) + + // Helper to read a full buffer from input stream + def readChunk(): Int = { + var offset = 0 + var read = 0 + while ( + offset < buffer.length && { + read = inputStream.read(buffer, offset, buffer.length - offset); read > 0 + } + ) { + offset += read + } + offset + } + + // Read first chunk to check if stream is empty + val firstChunkSize = readChunk() + if (firstChunkSize == 0) { + return s3Client + .putObject( + PutObjectRequest.builder().bucket(bucketName).key(objectKey).build(), + RequestBody.fromBytes(Array.empty[Byte]) + ) + .eTag() + } + + val uploadId = s3Client + .createMultipartUpload( + CreateMultipartUploadRequest.builder().bucket(bucketName).key(objectKey).build() + ) + .uploadId() + + try { + // Upload all parts using an iterator + val allParts = Iterator + .iterate((1, firstChunkSize)) { case (partNum, _) => (partNum + 1, readChunk()) } + .takeWhile { case (_, size) => size > 0 } + .map { + case (partNumber, chunkSize) => + val eTag = s3Client + .uploadPart( + UploadPartRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .partNumber(partNumber) + .build(), + RequestBody.fromBytes(buffer.take(chunkSize)) + ) + .eTag() + CompletedPart.builder().partNumber(partNumber).eTag(eTag).build() + } + .toList + + s3Client + .completeMultipartUpload( + CompleteMultipartUploadRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .multipartUpload(CompletedMultipartUpload.builder().parts(allParts.asJava).build()) + .build() + ) + .eTag() + + } catch { + case e: Exception => + try { + s3Client.abortMultipartUpload( + AbortMultipartUploadRequest + .builder() + .bucket(bucketName) + .key(objectKey) + .uploadId(uploadId) + .build() + ) + } catch { case _: Exception => } Review Comment: Thanks! Changed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
