shunping commented on code in PR #37900:
URL: https://github.com/apache/beam/pull/37900#discussion_r3190516905
##########
sdks/java/extensions/google-cloud-platform-core/src/main/java/org/apache/beam/sdk/extensions/gcp/util/GcsUtilV2.java:
##########
@@ -481,4 +493,141 @@ public void removeBucket(BucketInfo bucketInfo) throws
IOException {
throw translateStorageException(bucketInfo.getName(), null, e);
}
}
+
+ /** A bridge that allows a GCS ReadChannel to behave as a
SeekableByteChannel. */
+ private static class GcsSeekableByteChannel implements SeekableByteChannel {
+ private final ReadChannel reader;
+ private final long size;
+ private long position = 0;
+
+ GcsSeekableByteChannel(ReadChannel reader, long size) {
+ this.reader = reader;
+ this.size = size;
+ this.position = 0;
+ }
+
+ @Override
+ public int read(ByteBuffer dst) throws IOException {
+ int count = reader.read(dst);
+ if (count > 0) {
+ this.position += count;
+ }
+ return count;
+ }
+
+ @Override
+ public SeekableByteChannel position(long newPosition) throws IOException {
+ checkArgument(newPosition >= 0, "Position must be non-negative: %s",
newPosition);
+ reader.seek(newPosition);
+ this.position = newPosition;
+ return this;
+ }
+
+ @Override
+ public long position() throws IOException {
+ return this.position;
+ }
+
+ @Override
+ public long size() throws IOException {
+ return size;
+ }
+
+ @Override
+ public SeekableByteChannel truncate(long size) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannels are read-only and cannot be truncated.");
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ throw new UnsupportedOperationException(
+ "GcsSeekableByteChannel are read-only and does not support
writing.");
+ }
+
+ @Override
+ public boolean isOpen() {
+ return reader.isOpen();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (isOpen()) {
+ reader.close();
+ }
+ }
+ }
+
+ public SeekableByteChannel open(GcsPath path, BlobSourceOption...
sourceOptions)
+ throws IOException {
+ Blob blob = getBlob(path, BlobGetOption.fields(BlobField.SIZE));
+ return new GcsSeekableByteChannel(
+ blob.getStorage().reader(blob.getBlobId(), sourceOptions),
blob.getSize());
Review Comment:
Why do we need to disable internal buffering here? Will that have any
performance implication?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]