[ https://issues.apache.org/jira/browse/BEAM-6841?focusedWorklogId=220669&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-220669 ]
ASF GitHub Bot logged work on BEAM-6841: ---------------------------------------- Author: ASF GitHub Bot Created on: 29/Mar/19 19:07 Start Date: 29/Mar/19 19:07 Worklog Time Spent: 10m Work Description: chamikaramj commented on pull request #8061: [BEAM-6841] Add support for reading query results using the BigQuery storage API. URL: https://github.com/apache/beam/pull/8061#discussion_r270521373 ########## File path: sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java ########## @@ -914,6 +933,181 @@ void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception { return rows.apply(new PassThroughThenCleanup<>(cleanupOperation, jobIdTokenView)); } + private PCollection<T> expandForDirectRead(PBegin input, Coder<T> outputCoder) { + ValueProvider<TableReference> tableProvider = getTableProvider(); + Pipeline p = input.getPipeline(); + if (tableProvider != null) { + // No job ID is required. Read directly from BigQuery storage. + return p.apply( + org.apache.beam.sdk.io.Read.from( + BigQueryStorageTableSource.create( + tableProvider, + getReadOptions(), + getParseFn(), + outputCoder, + getBigQueryServices()))); + } + + checkArgument( + getReadOptions() == null, + "Invalid BigQueryIO.Read: Specifies table read options, " + + "which only applies when reading from a table"); + + // + // N.B. All of the code below exists because the BigQuery storage API can't (yet) read from + // all anonymous tables, so we need the job ID to reason about the name of the destination + // table for the query to read the data and subsequently delete the table and dataset. Once + // the storage API can handle anonymous tables, the storage source should be modified to use + // anonymous tables and all of the code related to job ID generation and table and dataset + // cleanup can be removed. + // + + PCollectionView<String> jobIdTokenView; + PCollection<T> rows; + + if (!getWithTemplateCompatibility()) { + // Create a singleton job ID token at pipeline construction time. + String staticJobUuid = BigQueryHelpers.randomUUIDString(); + jobIdTokenView = + p.apply("TriggerIdCreation", Create.of(staticJobUuid)) + .apply("ViewId", View.asSingleton()); + // Apply the traditional Source model. + rows = + p.apply( + org.apache.beam.sdk.io.Read.from( + createStorageQuerySource(staticJobUuid, outputCoder))); + } else { + // Create a singleton job ID token at pipeline execution time. + PCollection<String> jobIdTokenCollection = + p.apply("TriggerIdCreation", Create.of("ignored")) + .apply( + "CreateJobId", + MapElements.via( + new SimpleFunction<String, String>() { + @Override + public String apply(String input) { + return BigQueryHelpers.randomUUIDString(); + } + })); + + jobIdTokenView = jobIdTokenCollection.apply("ViewId", View.asSingleton()); + + TupleTag<Stream> streamsTag = new TupleTag<>(); + TupleTag<ReadSession> readSessionTag = new TupleTag<>(); + TupleTag<String> tableSchemaTag = new TupleTag<>(); + + PCollectionTuple tuple = + jobIdTokenCollection.apply( + "RunQueryJob", + ParDo.of( + new DoFn<String, Stream>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + BigQueryOptions options = + c.getPipelineOptions().as(BigQueryOptions.class); + String jobUuid = c.element(); + // Execute the query and get the destination table holding the results. + BigQueryStorageQuerySource<T> querySource = + createStorageQuerySource(jobUuid, outputCoder); + Table queryResultTable = querySource.getTargetTable(options); + + // Create a read session without specifying a desired stream count and + // let the BigQuery storage server pick the number of streams. + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent("projects/" + options.getProject()) + .setTableReference( + BigQueryHelpers.toTableRefProto( + queryResultTable.getTableReference())) + .setRequestedStreams(0) + .build(); + + ReadSession readSession; + try (StorageClient storageClient = + getBigQueryServices().getStorageClient(options)) { + readSession = storageClient.createReadSession(request); + } + + for (Stream stream : readSession.getStreamsList()) { + c.output(stream); + } + + c.output(readSessionTag, readSession); + c.output( + tableSchemaTag, + BigQueryHelpers.toJsonString(queryResultTable.getSchema())); + } + }) + .withOutputTags( + streamsTag, TupleTagList.of(readSessionTag).and(tableSchemaTag))); + + tuple.get(streamsTag).setCoder(ProtoCoder.of(Stream.class)); + tuple.get(readSessionTag).setCoder(ProtoCoder.of(ReadSession.class)); + tuple.get(tableSchemaTag).setCoder(StringUtf8Coder.of()); + + PCollectionView<ReadSession> readSessionView = + tuple.get(readSessionTag).apply("ReadSessionView", View.asSingleton()); + PCollectionView<String> tableSchemaView = + tuple.get(tableSchemaTag).apply("TableSchemaView", View.asSingleton()); + + rows = + tuple + .get(streamsTag) + .apply(Reshuffle.viaRandomKey()) + .apply( + ParDo.of( + new DoFn<Stream, T>() { + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + ReadSession readSession = c.sideInput(readSessionView); + TableSchema tableSchema = + BigQueryHelpers.fromJsonString( + c.sideInput(tableSchemaView), TableSchema.class); + Stream stream = c.element(); + + BigQueryStorageStreamSource<T> streamSource = + BigQueryStorageStreamSource.create( + readSession, + stream, + tableSchema, + getParseFn(), + outputCoder, + getBigQueryServices()); + + BoundedSource.BoundedReader<T> reader = + streamSource.createReader(c.getPipelineOptions()); + for (boolean more = reader.start(); more; more = reader.advance()) { + c.output(reader.getCurrent()); Review comment: Can you please add a comment along there lines here (or to a top level comment). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 220669) Time Spent: 5h 40m (was: 5.5h) > Support reading query results with the BigQuery storage API > ----------------------------------------------------------- > > Key: BEAM-6841 > URL: https://issues.apache.org/jira/browse/BEAM-6841 > Project: Beam > Issue Type: New Feature > Components: io-java-gcp > Reporter: Kenneth Jung > Assignee: Kenneth Jung > Priority: Minor > Time Spent: 5h 40m > Remaining Estimate: 0h > -- This message was sent by Atlassian JIRA (v7.6.3#76005)