Condense BigQueryIO.Read.Bound into BigQueryIO.Read

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/825338aa
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/825338aa
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/825338aa

Branch: refs/heads/master
Commit: 825338aaa5d7e5ead1afa13f63c65fb316e1aa6a
Parents: 30f3634
Author: Eugene Kirpichov <[email protected]>
Authored: Thu Mar 2 17:15:47 2017 -0800
Committer: Thomas Groh <[email protected]>
Committed: Tue Mar 14 15:54:22 2017 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/bigquery/BigQueryIO.java    | 690 +++++++++----------
 .../sdk/io/gcp/bigquery/BigQueryIOTest.java     | 117 ++--
 2 files changed, 359 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
index 2902c2b..f6c8575 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java
@@ -39,7 +39,6 @@ import com.google.api.services.bigquery.model.TableSchema;
 import com.google.cloud.hadoop.util.ApiErrorExtractor;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
-import com.google.common.base.MoreObjects;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
@@ -454,448 +453,379 @@ public class BigQueryIO {
    *   }
    * }}</pre>
    */
-  public static class Read {
+  public static class Read extends PTransform<PBegin, PCollection<TableRow>> {
+    @Nullable final ValueProvider<String> jsonTableRef;
+    @Nullable final ValueProvider<String> query;
 
     /**
      * Reads a BigQuery table specified as {@code 
"[project_id]:[dataset_id].[table_id]"} or
      * {@code "[dataset_id].[table_id]"} for tables within the current project.
      */
-    public static Bound from(String tableSpec) {
-      return new Bound().from(StaticValueProvider.of(tableSpec));
+    public static Read from(String tableSpec) {
+      return new Read().from(StaticValueProvider.of(tableSpec));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Bound from(ValueProvider<String> tableSpec) {
-      return new Bound().from(tableSpec);
+    public static Read from(ValueProvider<String> tableSpec) {
+      return new Read().from(tableSpec);
     }
 
     /**
      * Reads results received after executing the given query.
      */
-    public static Bound fromQuery(String query) {
-      return new Bound().fromQuery(StaticValueProvider.of(query));
+    public static Read fromQuery(String query) {
+      return new Read().fromQuery(StaticValueProvider.of(query));
     }
 
     /**
      * Same as {@code from(String)}, but with a {@link ValueProvider}.
      */
-    public static Bound fromQuery(ValueProvider<String> query) {
-      return new Bound().fromQuery(query);
+    public static Read fromQuery(ValueProvider<String> query) {
+      return new Read().fromQuery(query);
     }
 
     /**
      * Reads a BigQuery table specified as a {@link TableReference} object.
      */
-    public static Bound from(TableReference table) {
-      return new Bound().from(table);
+    public static Read from(TableReference table) {
+      return new Read().from(table);
     }
 
     /**
-     * Disables BigQuery table validation, which is enabled by default.
+     * Disable validation that the table exists or the query succeeds prior to 
pipeline
+     * submission. Basic validation (such as ensuring that a query or table is 
specified) still
+     * occurs.
      */
-    public static Bound withoutValidation() {
-      return new Bound().withoutValidation();
+    final boolean validate;
+    @Nullable final Boolean flattenResults;
+    @Nullable final Boolean useLegacySql;
+    @Nullable BigQueryServices bigQueryServices;
+
+    @VisibleForTesting @Nullable String stepUuid;
+    @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
+
+    private static final String QUERY_VALIDATION_FAILURE_ERROR =
+        "Validation of query \"%1$s\" failed. If the query depends on an 
earlier stage of the"
+        + " pipeline, This validation can be disabled using 
#withoutValidation.";
+
+    private Read() {
+      this(
+          null /* name */,
+          null /* query */,
+          null /* jsonTableRef */,
+          true /* validate */,
+          null /* flattenResults */,
+          null /* useLegacySql */,
+          null /* bigQueryServices */);
+    }
+
+    private Read(
+        String name, @Nullable ValueProvider<String> query,
+        @Nullable ValueProvider<String> jsonTableRef, boolean validate,
+        @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
+        @Nullable BigQueryServices bigQueryServices) {
+      super(name);
+      this.jsonTableRef = jsonTableRef;
+      this.query = query;
+      this.validate = validate;
+      this.flattenResults = flattenResults;
+      this.useLegacySql = useLegacySql;
+      this.bigQueryServices = bigQueryServices;
     }
 
     /**
-     * A {@link PTransform} that reads from a BigQuery table and returns a 
bounded
-     * {@link PCollection} of {@link TableRow TableRows}.
+     * Disable validation that the table exists or the query succeeds prior to 
pipeline
+     * submission. Basic validation (such as ensuring that a query or table is 
specified) still
+     * occurs.
      */
-    public static class Bound extends PTransform<PBegin, 
PCollection<TableRow>> {
-      @Nullable final ValueProvider<String> jsonTableRef;
-      @Nullable final ValueProvider<String> query;
-
-      /**
-       * Disable validation that the table exists or the query succeeds prior 
to pipeline
-       * submission. Basic validation (such as ensuring that a query or table 
is specified) still
-       * occurs.
-       */
-      final boolean validate;
-      @Nullable final Boolean flattenResults;
-      @Nullable final Boolean useLegacySql;
-      @Nullable BigQueryServices bigQueryServices;
-
-      @VisibleForTesting @Nullable String stepUuid;
-      @VisibleForTesting @Nullable ValueProvider<String> jobUuid;
-
-      private static final String QUERY_VALIDATION_FAILURE_ERROR =
-          "Validation of query \"%1$s\" failed. If the query depends on an 
earlier stage of the"
-          + " pipeline, This validation can be disabled using 
#withoutValidation.";
-
-      private Bound() {
-        this(
-            null /* name */,
-            null /* query */,
-            null /* jsonTableRef */,
-            true /* validate */,
-            null /* flattenResults */,
-            null /* useLegacySql */,
-            null /* bigQueryServices */);
-      }
-
-      private Bound(
-          String name, @Nullable ValueProvider<String> query,
-          @Nullable ValueProvider<String> jsonTableRef, boolean validate,
-          @Nullable Boolean flattenResults, @Nullable Boolean useLegacySql,
-          @Nullable BigQueryServices bigQueryServices) {
-        super(name);
-        this.jsonTableRef = jsonTableRef;
-        this.query = query;
-        this.validate = validate;
-        this.flattenResults = flattenResults;
-        this.useLegacySql = useLegacySql;
-        this.bigQueryServices = bigQueryServices;
-      }
-
-      /**
-       * Returns a copy of this transform that reads from the specified table. 
Refer to
-       * {@link #parseTableSpec(String)} for the specification format.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound from(ValueProvider<String> tableSpec) {
-        return new Bound(
-            name, query,
-            NestedValueProvider.of(
-                NestedValueProvider.of(
-                    tableSpec, new TableSpecToTableRef()),
-                new TableRefToJson()),
-            validate, flattenResults, useLegacySql, bigQueryServices);
-      }
-
-      /**
-       * Returns a copy of this transform that reads from the specified table.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound from(TableReference table) {
-        return from(StaticValueProvider.of(toTableSpec(table)));
-      }
-
-      /**
-       * Returns a copy of this transform that reads the results of the 
specified query.
-       *
-       * <p>Does not modify this object.
-       *
-       * <p>By default, the query results will be flattened -- see
-       * "flattenResults" in the <a 
href="https://cloud.google.com/bigquery/docs/reference/v2/jobs";>
-       * Jobs documentation</a> for more information.  To disable flattening, 
use
-       * {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
-       *
-       * <p>By default, the query will use BigQuery's legacy SQL dialect. To 
use the BigQuery
-       * Standard SQL dialect, use {@link 
BigQueryIO.Read.Bound#usingStandardSql}.
-       */
-      public Bound fromQuery(String query) {
-        return fromQuery(StaticValueProvider.of(query));
-      }
-
-      /**
-       * Like {@link #fromQuery(String)}, but from a {@link ValueProvider}.
-       */
-      public Bound fromQuery(ValueProvider<String> query) {
-        return new Bound(name, query, jsonTableRef, validate,
-            MoreObjects.firstNonNull(flattenResults, Boolean.TRUE),
-            MoreObjects.firstNonNull(useLegacySql, Boolean.TRUE),
-            bigQueryServices);
-      }
+    public Read withoutValidation() {
+      return new Read(
+          name, query, jsonTableRef, false /* validate */, flattenResults, 
useLegacySql,
+          bigQueryServices);
+    }
 
-      /**
-       * Disable validation that the table exists or the query succeeds prior 
to pipeline
-       * submission. Basic validation (such as ensuring that a query or table 
is specified) still
-       * occurs.
-       */
-      public Bound withoutValidation() {
-        return new Bound(
-            name, query, jsonTableRef, false /* validate */, flattenResults, 
useLegacySql,
-            bigQueryServices);
-      }
+    /**
+     * Disable <a 
href="https://cloud.google.com/bigquery/docs/reference/v2/jobs";>
+     * flattening of query results</a>.
+     *
+     * <p>Only valid when a query is used ({@link #fromQuery}). Setting this 
option when reading
+     * from a table will cause an error during validation.
+     */
+    public Read withoutResultFlattening() {
+      return new Read(
+          name, query, jsonTableRef, validate, false /* flattenResults */, 
useLegacySql,
+          bigQueryServices);
+    }
 
-      /**
-       * Disable <a 
href="https://cloud.google.com/bigquery/docs/reference/v2/jobs";>
-       * flattening of query results</a>.
-       *
-       * <p>Only valid when a query is used ({@link #fromQuery}). Setting this 
option when reading
-       * from a table will cause an error during validation.
-       */
-      public Bound withoutResultFlattening() {
-        return new Bound(
-            name, query, jsonTableRef, validate, false /* flattenResults */, 
useLegacySql,
-            bigQueryServices);
-      }
+    /**
+     * Enables BigQuery's Standard SQL dialect when reading from a query.
+     *
+     * <p>Only valid when a query is used ({@link #fromQuery}). Setting this 
option when reading
+     * from a table will cause an error during validation.
+     */
+    public Read usingStandardSql() {
+      return new Read(
+          name, query, jsonTableRef, validate, flattenResults, false /* 
useLegacySql */,
+          bigQueryServices);
+    }
 
-      /**
-       * Enables BigQuery's Standard SQL dialect when reading from a query.
-       *
-       * <p>Only valid when a query is used ({@link #fromQuery}). Setting this 
option when reading
-       * from a table will cause an error during validation.
-       */
-      public Bound usingStandardSql() {
-        return new Bound(
-            name, query, jsonTableRef, validate, flattenResults, false /* 
useLegacySql */,
-            bigQueryServices);
-      }
+    @VisibleForTesting
+    Read withTestServices(BigQueryServices testServices) {
+      return new Read(
+          name, query, jsonTableRef, validate, flattenResults, useLegacySql, 
testServices);
+    }
 
-      @VisibleForTesting
-      Bound withTestServices(BigQueryServices testServices) {
-        return new Bound(
-            name, query, jsonTableRef, validate, flattenResults, useLegacySql, 
testServices);
+    @Override
+    public void validate(PBegin input) {
+      // Even if existence validation is disabled, we need to make sure that 
the BigQueryIO
+      // read is properly specified.
+      BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
+
+      String tempLocation = bqOptions.getTempLocation();
+      checkArgument(
+          !Strings.isNullOrEmpty(tempLocation),
+          "BigQueryIO.Read needs a GCS temp location to store temp files.");
+      if (bigQueryServices == null) {
+        try {
+          GcsPath.fromUri(tempLocation);
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException(
+              String.format(
+                  "BigQuery temp location expected a valid 'gs://' path, but 
was given '%s'",
+                  tempLocation),
+              e);
+        }
       }
 
-      @Override
-      public void validate(PBegin input) {
-        // Even if existence validation is disabled, we need to make sure that 
the BigQueryIO
-        // read is properly specified.
-        BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
+      ValueProvider<TableReference> table = 
getTableWithDefaultProject(bqOptions);
 
-        String tempLocation = bqOptions.getTempLocation();
-        checkArgument(
-            !Strings.isNullOrEmpty(tempLocation),
-            "BigQueryIO.Read needs a GCS temp location to store temp files.");
-        if (bigQueryServices == null) {
-          try {
-            GcsPath.fromUri(tempLocation);
-          } catch (IllegalArgumentException e) {
-            throw new IllegalArgumentException(
-                String.format(
-                    "BigQuery temp location expected a valid 'gs://' path, but 
was given '%s'",
-                    tempLocation),
-                e);
-          }
-        }
-
-        ValueProvider<TableReference> table = 
getTableWithDefaultProject(bqOptions);
+      checkState(
+          table == null || query == null,
+          "Invalid BigQueryIO.Read: table reference and query may not both be 
set");
+      checkState(
+          table != null || query != null,
+          "Invalid BigQueryIO.Read: one of table reference and query must be 
set");
 
+      if (table != null) {
         checkState(
-            table == null || query == null,
-            "Invalid BigQueryIO.Read: table reference and query may not both 
be set");
+            flattenResults == null,
+            "Invalid BigQueryIO.Read: Specifies a table with a result 
flattening"
+                + " preference, which only applies to queries");
         checkState(
-            table != null || query != null,
-            "Invalid BigQueryIO.Read: one of table reference and query must be 
set");
-
-        if (table != null) {
-          checkState(
-              flattenResults == null,
-              "Invalid BigQueryIO.Read: Specifies a table with a result 
flattening"
-                  + " preference, which only applies to queries");
-          checkState(
-              useLegacySql == null,
-              "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
-                  + " preference, which only applies to queries");
-        } else /* query != null */ {
-          checkState(flattenResults != null, "flattenResults should not be 
null if query is set");
-          checkState(useLegacySql != null, "useLegacySql should not be null if 
query is set");
-        }
-
-        // Note that a table or query check can fail if the table or dataset 
are created by
-        // earlier stages of the pipeline or if a query depends on earlier 
stages of a pipeline.
-        // For these cases the withoutValidation method can be used to disable 
the check.
-        if (validate && table != null) {
-          checkState(table.isAccessible(), "Cannot call validate if table is 
dynamically set.");
-          // Check for source table presence for early failure notification.
-          DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
-          verifyDatasetPresence(datasetService, table.get());
-          verifyTablePresence(datasetService, table.get());
-        } else if (validate && query != null) {
-          checkState(query.isAccessible(), "Cannot call validate if query is 
dynamically set.");
-          JobService jobService = 
getBigQueryServices().getJobService(bqOptions);
-          try {
-            jobService.dryRunQuery(
-                bqOptions.getProject(),
-                new JobConfigurationQuery()
-                    .setQuery(query.get())
-                    .setFlattenResults(flattenResults)
-                    .setUseLegacySql(useLegacySql));
-          } catch (Exception e) {
-            throw new IllegalArgumentException(
-                String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
-          }
-        }
-      }
-
-      @Override
-      public PCollection<TableRow> expand(PBegin input) {
-        stepUuid = randomUUIDString();
-        BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
-        jobUuid = NestedValueProvider.of(
-           StaticValueProvider.of(bqOptions.getJobName()), new 
CreatePerBeamJobUuid(stepUuid));
-        final ValueProvider<String> jobIdToken = NestedValueProvider.of(
-            jobUuid, new BeamJobUuidToBigQueryJobUuid());
-
-        BoundedSource<TableRow> source;
-        final BigQueryServices bqServices = getBigQueryServices();
-
-        final String extractDestinationDir;
-        String tempLocation = bqOptions.getTempLocation();
+            useLegacySql == null,
+            "Invalid BigQueryIO.Read: Specifies a table with a SQL dialect"
+                + " preference, which only applies to queries");
+      } else /* query != null */ {
+        checkState(flattenResults != null, "flattenResults should not be null 
if query is set");
+        checkState(useLegacySql != null, "useLegacySql should not be null if 
query is set");
+      }
+
+      // Note that a table or query check can fail if the table or dataset are 
created by
+      // earlier stages of the pipeline or if a query depends on earlier 
stages of a pipeline.
+      // For these cases the withoutValidation method can be used to disable 
the check.
+      if (validate && table != null) {
+        checkState(table.isAccessible(), "Cannot call validate if table is 
dynamically set.");
+        // Check for source table presence for early failure notification.
+        DatasetService datasetService = 
getBigQueryServices().getDatasetService(bqOptions);
+        verifyDatasetPresence(datasetService, table.get());
+        verifyTablePresence(datasetService, table.get());
+      } else if (validate && query != null) {
+        checkState(query.isAccessible(), "Cannot call validate if query is 
dynamically set.");
+        JobService jobService = getBigQueryServices().getJobService(bqOptions);
         try {
-          IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
-          extractDestinationDir = factory.resolve(tempLocation, stepUuid);
-        } catch (IOException e) {
-          throw new RuntimeException(
-              String.format("Failed to resolve extract destination directory 
in %s", tempLocation));
-        }
-
-        final String executingProject = bqOptions.getProject();
-        if (query != null && (!query.isAccessible() || 
!Strings.isNullOrEmpty(query.get()))) {
-          source = BigQueryQuerySource.create(
-              jobIdToken, query, NestedValueProvider.of(
-                jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
-              flattenResults, useLegacySql, extractDestinationDir, bqServices);
-        } else {
-          ValueProvider<TableReference> inputTable = 
getTableWithDefaultProject(bqOptions);
-          source = BigQueryTableSource.create(
-              jobIdToken, inputTable, extractDestinationDir, bqServices,
-              StaticValueProvider.of(executingProject));
+          jobService.dryRunQuery(
+              bqOptions.getProject(),
+              new JobConfigurationQuery()
+                  .setQuery(query.get())
+                  .setFlattenResults(flattenResults)
+                  .setUseLegacySql(useLegacySql));
+        } catch (Exception e) {
+          throw new IllegalArgumentException(
+              String.format(QUERY_VALIDATION_FAILURE_ERROR, query.get()), e);
         }
-        PassThroughThenCleanup.CleanupOperation cleanupOperation =
-            new PassThroughThenCleanup.CleanupOperation() {
-              @Override
-              void cleanup(PipelineOptions options) throws Exception {
-                BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
-
-                JobReference jobRef = new JobReference()
-                    .setProjectId(executingProject)
-                    .setJobId(getExtractJobId(jobIdToken));
-
-                Job extractJob = bqServices.getJobService(bqOptions)
-                    .getJob(jobRef);
-
-                Collection<String> extractFiles = null;
-                if (extractJob != null) {
-                  extractFiles = getExtractFilePaths(extractDestinationDir, 
extractJob);
-                } else {
-                  IOChannelFactory factory = 
IOChannelUtils.getFactory(extractDestinationDir);
-                  Collection<String> dirMatch = 
factory.match(extractDestinationDir);
-                  if (!dirMatch.isEmpty()) {
-                    extractFiles = 
factory.match(factory.resolve(extractDestinationDir, "*"));
-                  }
-                }
-                if (extractFiles != null && !extractFiles.isEmpty()) {
-                  new GcsUtilFactory().create(options).remove(extractFiles);
-                }
-              }};
-        return input.getPipeline()
-            .apply(org.apache.beam.sdk.io.Read.from(source))
-            .setCoder(getDefaultOutputCoder())
-            .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
       }
+    }
 
-      @Override
-      protected Coder<TableRow> getDefaultOutputCoder() {
-        return TableRowJsonCoder.of();
+    @Override
+    public PCollection<TableRow> expand(PBegin input) {
+      stepUuid = randomUUIDString();
+      BigQueryOptions bqOptions = 
input.getPipeline().getOptions().as(BigQueryOptions.class);
+      jobUuid = NestedValueProvider.of(
+         StaticValueProvider.of(bqOptions.getJobName()), new 
CreatePerBeamJobUuid(stepUuid));
+      final ValueProvider<String> jobIdToken = NestedValueProvider.of(
+          jobUuid, new BeamJobUuidToBigQueryJobUuid());
+
+      BoundedSource<TableRow> source;
+      final BigQueryServices bqServices = getBigQueryServices();
+
+      final String extractDestinationDir;
+      String tempLocation = bqOptions.getTempLocation();
+      try {
+        IOChannelFactory factory = IOChannelUtils.getFactory(tempLocation);
+        extractDestinationDir = factory.resolve(tempLocation, stepUuid);
+      } catch (IOException e) {
+        throw new RuntimeException(
+            String.format("Failed to resolve extract destination directory in 
%s", tempLocation));
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-            .addIfNotNull(DisplayData.item("table", 
displayTable(getTableProvider()))
-              .withLabel("Table"))
-            .addIfNotNull(DisplayData.item("query", query)
-              .withLabel("Query"))
-            .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
-              .withLabel("Flatten Query Results"))
-            .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
-              .withLabel("Use Legacy SQL Dialect"))
-            .addIfNotDefault(DisplayData.item("validation", validate)
-              .withLabel("Validation Enabled"),
-                true);
+      final String executingProject = bqOptions.getProject();
+      if (query != null && (!query.isAccessible() || 
!Strings.isNullOrEmpty(query.get()))) {
+        source = BigQueryQuerySource.create(
+            jobIdToken, query, NestedValueProvider.of(
+              jobUuid, new CreateJsonTableRefFromUuid(executingProject)),
+            flattenResults, useLegacySql, extractDestinationDir, bqServices);
+      } else {
+        ValueProvider<TableReference> inputTable = 
getTableWithDefaultProject(bqOptions);
+        source = BigQueryTableSource.create(
+            jobIdToken, inputTable, extractDestinationDir, bqServices,
+            StaticValueProvider.of(executingProject));
       }
+      PassThroughThenCleanup.CleanupOperation cleanupOperation =
+          new PassThroughThenCleanup.CleanupOperation() {
+            @Override
+            void cleanup(PipelineOptions options) throws Exception {
+              BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
+
+              JobReference jobRef = new JobReference()
+                  .setProjectId(executingProject)
+                  .setJobId(getExtractJobId(jobIdToken));
+
+              Job extractJob = bqServices.getJobService(bqOptions)
+                  .getJob(jobRef);
+
+              Collection<String> extractFiles = null;
+              if (extractJob != null) {
+                extractFiles = getExtractFilePaths(extractDestinationDir, 
extractJob);
+              } else {
+                IOChannelFactory factory = 
IOChannelUtils.getFactory(extractDestinationDir);
+                Collection<String> dirMatch = 
factory.match(extractDestinationDir);
+                if (!dirMatch.isEmpty()) {
+                  extractFiles = 
factory.match(factory.resolve(extractDestinationDir, "*"));
+                }
+              }
+              if (extractFiles != null && !extractFiles.isEmpty()) {
+                new GcsUtilFactory().create(options).remove(extractFiles);
+              }
+            }};
+      return input.getPipeline()
+          .apply(org.apache.beam.sdk.io.Read.from(source))
+          .setCoder(getDefaultOutputCoder())
+          .apply(new PassThroughThenCleanup<TableRow>(cleanupOperation));
+    }
 
-      /**
-       * Returns the table to read, or {@code null} if reading from a query 
instead.
-       *
-       * <p>If the table's project is not specified, use the executing project.
-       */
-      @Nullable private ValueProvider<TableReference> 
getTableWithDefaultProject(
-          BigQueryOptions bqOptions) {
-        ValueProvider<TableReference> table = getTableProvider();
-        if (table == null) {
-          return table;
-        }
-        if (!table.isAccessible()) {
-          LOG.info("Using a dynamic value for table input. This must contain a 
project"
-              + " in the table reference: {}", table);
-          return table;
-        }
-        if (Strings.isNullOrEmpty(table.get().getProjectId())) {
-          // If user does not specify a project we assume the table to be 
located in
-          // the default project.
-          TableReference tableRef = table.get();
-          tableRef.setProjectId(bqOptions.getProject());
-          return NestedValueProvider.of(StaticValueProvider.of(
-              toJsonString(tableRef)), new JsonTableRefToTableRef());
-        }
+    @Override
+    protected Coder<TableRow> getDefaultOutputCoder() {
+      return TableRowJsonCoder.of();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("table", 
displayTable(getTableProvider()))
+            .withLabel("Table"))
+          .addIfNotNull(DisplayData.item("query", query)
+            .withLabel("Query"))
+          .addIfNotNull(DisplayData.item("flattenResults", flattenResults)
+            .withLabel("Flatten Query Results"))
+          .addIfNotNull(DisplayData.item("useLegacySql", useLegacySql)
+            .withLabel("Use Legacy SQL Dialect"))
+          .addIfNotDefault(DisplayData.item("validation", validate)
+            .withLabel("Validation Enabled"),
+              true);
+    }
+
+    /**
+     * Returns the table to read, or {@code null} if reading from a query 
instead.
+     *
+     * <p>If the table's project is not specified, use the executing project.
+     */
+    @Nullable private ValueProvider<TableReference> getTableWithDefaultProject(
+        BigQueryOptions bqOptions) {
+      ValueProvider<TableReference> table = getTableProvider();
+      if (table == null) {
         return table;
       }
-
-      /**
-       * Returns the table to read, or {@code null} if reading from a query 
instead.
-       */
-      @Nullable
-      public ValueProvider<TableReference> getTableProvider() {
-        return jsonTableRef == null
-            ? null : NestedValueProvider.of(jsonTableRef, new 
JsonTableRefToTableRef());
+      if (!table.isAccessible()) {
+        LOG.info("Using a dynamic value for table input. This must contain a 
project"
+            + " in the table reference: {}", table);
+        return table;
       }
-      /**
-       * Returns the table to read, or {@code null} if reading from a query 
instead.
-       */
-      @Nullable
-      public TableReference getTable() {
-        ValueProvider<TableReference> provider = getTableProvider();
-        return provider == null ? null : provider.get();
+      if (Strings.isNullOrEmpty(table.get().getProjectId())) {
+        // If user does not specify a project we assume the table to be 
located in
+        // the default project.
+        TableReference tableRef = table.get();
+        tableRef.setProjectId(bqOptions.getProject());
+        return NestedValueProvider.of(StaticValueProvider.of(
+            toJsonString(tableRef)), new JsonTableRefToTableRef());
       }
+      return table;
+    }
 
-      /**
-       * Returns the query to be read, or {@code null} if reading from a table 
instead.
-       */
-      @Nullable
-      public String getQuery() {
-        return query == null ? null : query.get();
-      }
+    /**
+     * Returns the table to read, or {@code null} if reading from a query 
instead.
+     */
+    @Nullable
+    public ValueProvider<TableReference> getTableProvider() {
+      return jsonTableRef == null
+          ? null : NestedValueProvider.of(jsonTableRef, new 
JsonTableRefToTableRef());
+    }
+    /**
+     * Returns the table to read, or {@code null} if reading from a query 
instead.
+     */
+    @Nullable
+    public TableReference getTable() {
+      ValueProvider<TableReference> provider = getTableProvider();
+      return provider == null ? null : provider.get();
+    }
 
-      /**
-       * Returns the query to be read, or {@code null} if reading from a table 
instead.
-       */
-      @Nullable
-      public ValueProvider<String> getQueryProvider() {
-        return query;
-      }
+    /**
+     * Returns the query to be read, or {@code null} if reading from a table 
instead.
+     */
+    @Nullable
+    public String getQuery() {
+      return query == null ? null : query.get();
+    }
 
-      /**
-       * Returns true if table validation is enabled.
-       */
-      public boolean getValidate() {
-        return validate;
-      }
+    /**
+     * Returns the query to be read, or {@code null} if reading from a table 
instead.
+     */
+    @Nullable
+    public ValueProvider<String> getQueryProvider() {
+      return query;
+    }
 
-      /**
-       * Returns true/false if result flattening is enabled/disabled, or null 
if not applicable.
-       */
-      public Boolean getFlattenResults() {
-        return flattenResults;
-      }
+    /**
+     * Returns true if table validation is enabled.
+     */
+    public boolean getValidate() {
+      return validate;
+    }
 
-      /**
-       * Returns true (false) if the query will (will not) use BigQuery's 
legacy SQL mode, or null
-       * if not applicable.
-       */
-      @Nullable
-      public Boolean getUseLegacySql() {
-        return useLegacySql;
-      }
+    /**
+     * Returns true/false if result flattening is enabled/disabled, or null if 
not applicable.
+     */
+    public Boolean getFlattenResults() {
+      return flattenResults;
+    }
 
-      private BigQueryServices getBigQueryServices() {
-        if (bigQueryServices == null) {
-          bigQueryServices = new BigQueryServicesImpl();
-        }
-        return bigQueryServices;
-      }
+    /**
+     * Returns true (false) if the query will (will not) use BigQuery's legacy 
SQL mode, or null
+     * if not applicable.
+     */
+    @Nullable
+    public Boolean getUseLegacySql() {
+      return useLegacySql;
     }
 
-    /** Disallow construction of utility class. */
-    private Read() {}
+    private BigQueryServices getBigQueryServices() {
+      if (bigQueryServices == null) {
+        bigQueryServices = new BigQueryServicesImpl();
+      }
+      return bigQueryServices;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/825338aa/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index c9061a3..bb1528b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -665,28 +665,28 @@ public class BigQueryIOTest implements Serializable {
   @Mock(extraInterfaces = Serializable.class) private transient DatasetService 
mockDatasetService;
 
   private void checkReadTableObject(
-      BigQueryIO.Read.Bound bound, String project, String dataset, String 
table) {
-    checkReadTableObjectWithValidate(bound, project, dataset, table, true);
+      BigQueryIO.Read read, String project, String dataset, String table) {
+    checkReadTableObjectWithValidate(read, project, dataset, table, true);
   }
 
-  private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) 
{
-    checkReadQueryObjectWithValidate(bound, query, true);
+  private void checkReadQueryObject(BigQueryIO.Read read, String query) {
+    checkReadQueryObjectWithValidate(read, query, true);
   }
 
   private void checkReadTableObjectWithValidate(
-      BigQueryIO.Read.Bound bound, String project, String dataset, String 
table, boolean validate) {
-    assertEquals(project, bound.getTable().getProjectId());
-    assertEquals(dataset, bound.getTable().getDatasetId());
-    assertEquals(table, bound.getTable().getTableId());
-    assertNull(bound.query);
-    assertEquals(validate, bound.getValidate());
+      BigQueryIO.Read read, String project, String dataset, String table, 
boolean validate) {
+    assertEquals(project, read.getTable().getProjectId());
+    assertEquals(dataset, read.getTable().getDatasetId());
+    assertEquals(table, read.getTable().getTableId());
+    assertNull(read.query);
+    assertEquals(validate, read.getValidate());
   }
 
   private void checkReadQueryObjectWithValidate(
-      BigQueryIO.Read.Bound bound, String query, boolean validate) {
-    assertNull(bound.getTable());
-    assertEquals(query, bound.getQuery());
-    assertEquals(validate, bound.getValidate());
+      BigQueryIO.Read read, String query, boolean validate) {
+    assertNull(read.getTable());
+    assertEquals(query, read.getQuery());
+    assertEquals(validate, read.getValidate());
   }
 
   private void checkWriteObject(
@@ -728,39 +728,39 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildTableBasedSource() {
-    BigQueryIO.Read.Bound bound = 
BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
-    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+    BigQueryIO.Read read = 
BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
+    checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
   public void testBuildQueryBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
-    checkReadQueryObject(bound, "foo_query");
+    BigQueryIO.Read read = BigQueryIO.Read.fromQuery("foo_query");
+    checkReadQueryObject(read, "foo_query");
   }
 
   @Test
   public void testBuildTableBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger 
object
     // construction errors.
-    BigQueryIO.Read.Bound bound =
+    BigQueryIO.Read read =
         
BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
-    checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", 
"sometable", false);
+    checkReadTableObjectWithValidate(read, "foo.com:project", "somedataset", 
"sometable", false);
   }
 
   @Test
   public void testBuildQueryBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger 
object
     // construction errors.
-    BigQueryIO.Read.Bound bound =
+    BigQueryIO.Read read =
         BigQueryIO.Read.fromQuery("some_query").withoutValidation();
-    checkReadQueryObjectWithValidate(bound, "some_query", false);
+    checkReadQueryObjectWithValidate(read, "some_query", false);
   }
 
   @Test
   public void testBuildTableBasedSourceWithDefaultProject() {
-    BigQueryIO.Read.Bound bound =
+    BigQueryIO.Read read =
         BigQueryIO.Read.from("somedataset.sometable");
-    checkReadTableObject(bound, null, "somedataset", "sometable");
+    checkReadTableObject(read, null, "somedataset", "sometable");
   }
 
   @Test
@@ -769,8 +769,8 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
-    checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
+    BigQueryIO.Read read = BigQueryIO.Read.from(table);
+    checkReadTableObject(read, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
@@ -807,39 +807,6 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   @Category(NeedsRunner.class)
-  public void testBuildSourceWithoutTableQueryOrValidation() {
-    BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQueryIO.Read: one of table reference and query must be 
set");
-    p.apply(BigQueryIO.Read.withoutValidation());
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
-  public void testBuildSourceWithTableAndQuery() {
-    BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
-    bqOptions.setProject("defaultProject");
-    bqOptions.setTempLocation("gs://testbucket/testdir");
-
-    Pipeline p = TestPipeline.create(bqOptions);
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQueryIO.Read: table reference and query may not both be 
set");
-    p.apply("ReadMyTable",
-        BigQueryIO.Read
-            .from("foo.com:project:somedataset.sometable")
-            .fromQuery("query"));
-    p.run();
-  }
-
-  @Test
-  @Category(NeedsRunner.class)
   public void testBuildSourceWithTableAndFlatten() {
     BigQueryOptions bqOptions = 
TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
     bqOptions.setProject("defaultProject");
@@ -1291,12 +1258,11 @@ public class BigQueryIOTest implements Serializable {
   }
 
   @Test
-  public void testBuildSourceDisplayData() {
+  public void testBuildSourceDisplayDataTable() {
     String tableSpec = "project:dataset.tableid";
 
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.Read
         .from(tableSpec)
-        .fromQuery("myQuery")
         .withoutResultFlattening()
         .usingStandardSql()
         .withoutValidation();
@@ -1304,6 +1270,21 @@ public class BigQueryIOTest implements Serializable {
     DisplayData displayData = DisplayData.from(read);
 
     assertThat(displayData, hasDisplayItem("table", tableSpec));
+    assertThat(displayData, hasDisplayItem("flattenResults", false));
+    assertThat(displayData, hasDisplayItem("useLegacySql", false));
+    assertThat(displayData, hasDisplayItem("validation", false));
+  }
+
+  @Test
+  public void testBuildSourceDisplayDataQuery() {
+    BigQueryIO.Read read = BigQueryIO.Read
+        .fromQuery("myQuery")
+        .withoutResultFlattening()
+        .usingStandardSql()
+        .withoutValidation();
+
+    DisplayData displayData = DisplayData.from(read);
+
     assertThat(displayData, hasDisplayItem("query", "myQuery"));
     assertThat(displayData, hasDisplayItem("flattenResults", false));
     assertThat(displayData, hasDisplayItem("useLegacySql", false));
@@ -1315,7 +1296,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation 
configuration insufficient")
   public void testTableSourcePrimitiveDisplayData() throws IOException, 
InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.Read
         .from("project:dataset.tableId")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -1332,7 +1313,7 @@ public class BigQueryIOTest implements Serializable {
   @Ignore("[BEAM-436] DirectRunner RunnableOnService tempLocation 
configuration insufficient")
   public void testQuerySourcePrimitiveDisplayData() throws IOException, 
InterruptedException {
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    BigQueryIO.Read.Bound read = BigQueryIO.Read
+    BigQueryIO.Read read = BigQueryIO.Read
         .fromQuery("foobar")
         .withTestServices(new FakeBigQueryServices()
             .withDatasetService(mockDatasetService)
@@ -2375,7 +2356,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read.Bound read = BigQueryIO.Read.from(
+    BigQueryIO.Read read = BigQueryIO.Read.from(
         options.getInputTable()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.
@@ -2388,7 +2369,7 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     bqOptions.setTempLocation("gs://testbucket/testdir");
     Pipeline pipeline = TestPipeline.create(options);
-    BigQueryIO.Read.Bound read = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read = BigQueryIO.Read.fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read);
     // Test that this doesn't throw.
@@ -2497,10 +2478,10 @@ public class BigQueryIOTest implements Serializable {
     BigQueryOptions bqOptions = options.as(BigQueryOptions.class);
     Pipeline pipeline = TestPipeline.create(options);
     bqOptions.setTempLocation("gs://testbucket/testdir");
-    BigQueryIO.Read.Bound read1 = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read1 = BigQueryIO.Read.fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read1);
-    BigQueryIO.Read.Bound read2 = BigQueryIO.Read.fromQuery(
+    BigQueryIO.Read read2 = BigQueryIO.Read.fromQuery(
         options.getInputQuery()).withoutValidation();
     pipeline.apply(read2);
     assertNotEquals(read1.stepUuid, read2.stepUuid);

Reply via email to