chamikaramj commented on a change in pull request #14262:
URL: https://github.com/apache/beam/pull/14262#discussion_r597365590
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java
##########
@@ -146,8 +164,19 @@ public String toString() {
return readStream.toString();
}
+ @Override
+ public long getMaxEndOffset(PipelineOptions options) throws Exception {
+ return getEndOffset();
+ }
+
+ @Override
+ public OffsetBasedSource<T> createSourceForSubrange(long start, long end) {
+ return this;
Review comment:
This is incorrect since the runner may try to split the source at
various ranges and run work items for each split which will result in duplicate
data if this change is committed.
##########
File path:
sdks/java/core/src/main/java/org/apache/beam/sdk/io/OffsetBasedSource.java
##########
@@ -337,7 +337,7 @@ public boolean allowsDynamicSplitting() {
}
@Override
- public final synchronized OffsetBasedSource<T> splitAtFraction(double
fraction) {
+ public synchronized OffsetBasedSource<T> splitAtFraction(double fraction) {
Review comment:
I do not know why a source that works purely on an offset range would
want to override this behavior. This is final intentionally since splitting for
all offset-based sources is expected to be this way. If splitting behavior of
BQ Storage API source is significantly different I think it might be better to
just copy and adapt this code rather than overriding OffsetBasedSource.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]