[ https://issues.apache.org/jira/browse/BEAM-7495?focusedWorklogId=260671&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-260671 ]
ASF GitHub Bot logged work on BEAM-7495: ---------------------------------------- Author: ASF GitHub Bot Created on: 14/Jun/19 19:31 Start Date: 14/Jun/19 19:31 Worklog Time Spent: 10m Work Description: aryann commented on pull request #8832: [BEAM-7495] Add dynamic worker rebalancing to BigQuery Storage URL: https://github.com/apache/beam/pull/8832#discussion_r293947453 ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java ########## @@ -215,23 +223,101 @@ public T getCurrent() throws NoSuchElementException { } @Override - protected long getCurrentOffset() throws NoSuchElementException { - return currentOffset; - } - - @Override - public void close() { + public synchronized void close() { storageClient.close(); } @Override public synchronized BigQueryStorageStreamSource<T> getCurrentSource() { - return (BigQueryStorageStreamSource<T>) super.getCurrentSource(); + return source; } @Override - public boolean allowsDynamicSplitting() { - return false; + public BoundedSource<T> splitAtFraction(double fraction) { + Metrics.counter(BigQueryStorageStreamReader.class, "split-at-fraction-calls").inc(); + LOGGER.info( + "Received split request for stream '{}' at fraction {}.", + source.stream.getName(), + fraction); + + SplitReadStreamRequest splitRequest = + SplitReadStreamRequest.newBuilder() + .setOriginalStream(source.stream) + // TODO(aryann): Once we rebuild the generated client code, we should change this to + // use setFraction(). + .setUnknownFields( + UnknownFieldSet.newBuilder() + .addField( + 2, + UnknownFieldSet.Field.newBuilder() + .addFixed32(java.lang.Float.floatToIntBits((float) fraction)) + .build()) + .build()) + .build(); + SplitReadStreamResponse splitResponse = storageClient.splitReadStream(splitRequest); + + if (!splitResponse.hasPrimaryStream() || !splitResponse.hasRemainderStream()) { + // No more splits are possible! + Metrics.counter( + BigQueryStorageStreamReader.class, + "split-at-fraction-calls-failed-due-to-impossible-split-point") + .inc(); + LOGGER.info("Stream '{}' cannot be split at {}.", source.stream.getName(), fraction); Review comment: Done. Here and elsewhere. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 260671) Time Spent: 1h 20m (was: 1h 10m) Remaining Estimate: 502h 40m (was: 502h 50m) > Add support for dynamic worker re-balancing when reading BigQuery data using > Cloud Dataflow > ------------------------------------------------------------------------------------------- > > Key: BEAM-7495 > URL: https://issues.apache.org/jira/browse/BEAM-7495 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp > Reporter: Aryan Naraghi > Assignee: Aryan Naraghi > Priority: Major > Original Estimate: 504h > Time Spent: 1h 20m > Remaining Estimate: 502h 40m > > Currently, the BigQuery connector for reading data using the BigQuery Storage > API does not support any of the facilities on the source for Dataflow to > split streams. > > On the server side, the BigQuery Storage API supports splitting streams at a > fraction. By adding support to the connector, we enable Dataflow to split > streams, which unlocks dynamic worker re-balancing. -- This message was sent by Atlassian JIRA (v7.6.3#76005)