BenWhitehead commented on code in PR #37900:
URL: https://github.com/apache/beam/pull/37900#discussion_r3191280260
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws
IOException {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}
+
+ /** A bridge that allows a GCS ReadChannel to behave as a
SeekableByteChannel. */
+ private static class GcsSeekableByteChannel implements SeekableByteChannel {
+ private final ReadChannel reader;
+ private final long size;
+ private long position = 0;
+
+ GcsSeekableByteChannel(ReadChannel reader, long size) {
+ this.reader = reader;
+ this.size = size;
+ this.position = 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int count = reader.read(dst);
+ if (count > 0) {
+ this.position += count;
+ }
+ return count;
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ checkArgument(newPosition >= 0, "Position must be non-negative: %s",
newPosition);
+ reader.seek(newPosition);
+ this.position = newPosition;
+ return this;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return this.position;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return size;
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannels are read-only and cannot be truncated.");
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannel are read-only and does not support
writing.");
+ }
+
+ @Override
+ public boolean isOpen() {
+ return reader.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen()) {
+ reader.close();
+ }
+ }
+ }
+
+ public SeekableByteChannel open(GcsPath path, BlobSourceOption...
sourceOptions)
+ throws IOException {
+ Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
+ return new GcsSeekableByteChannel(
+ blob.getStorage().reader(blob.getBlobId(), sourceOptions),
blob.getSize());
+ }
+
+ /** A bridge that allows a GCS WriteChannel to behave as a
WritableByteChannel. */
+ private static class GcsWritableByteChannel implements WritableByteChannel {
+ private final WriteChannel writer;
+ private final GcsPath gcsPath;
+
+ GcsWritableByteChannel(WriteChannel writer, GcsPath gcsPath) {
+ this.writer = writer;
+ this.gcsPath = gcsPath;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ try {
+ return writer.write(src);
+ } catch (StorageException e) {
+ throw translateStorageException(gcsPath, e);
+ }
+ }
+
+ @Override
+ public boolean isOpen() {
+ return writer.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ writer.close();
+ }
+ }
+
+ public WritableByteChannel create(
+ GcsPath path, GcsUtilV1.CreateOptions options, BlobWriteOption...
writeOptions)
+ throws IOException {
+ try {
+ // Define the metadata for the new object
+ BlobInfo.Builder builder = BlobInfo.newBuilder(path.getBucket(),
path.getObject());
+ String type = options.getContentType();
+ if (type != null) {
+ builder.setContentType(type);
+ }
+
+ BlobInfo blobInfo = builder.build();
+
+ List<BlobWriteOption> writeOptionList = new
ArrayList<>(Arrays.asList(writeOptions));
+ if (options.getExpectFileToNotExist()) {
+ writeOptionList.add(BlobWriteOption.doesNotExist());
+ }
Review Comment:
LGTM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]