[BEAM-50] Remove BigQueryIO.Write.Bound translator
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7f51f6af Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7f51f6af Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7f51f6af Branch: refs/heads/master Commit: 7f51f6af61dbae4e4d36d2fd1e94be945b60b11c Parents: 9039949 Author: Pei He <pe...@google.com> Authored: Fri Apr 15 16:19:54 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Apr 15 17:32:46 2016 -0700 ---------------------------------------------------------------------- .../sdk/runners/DataflowPipelineTranslator.java | 2 - .../runners/dataflow/BigQueryIOTranslator.java | 55 -------------------- 2 files changed, 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f51f6af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java index 0a71f65..4e60545 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/DataflowPipelineTranslator.java @@ -1041,8 +1041,6 @@ public class DataflowPipelineTranslator { registerTransformTranslator( BigQueryIO.Read.Bound.class, new BigQueryIOTranslator.ReadTranslator()); - registerTransformTranslator( - BigQueryIO.Write.Bound.class, new BigQueryIOTranslator.WriteTranslator()); registerTransformTranslator( PubsubIO.Read.Bound.class, new PubsubIOTranslator.ReadTranslator()); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7f51f6af/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/dataflow/BigQueryIOTranslator.java ---------------------------------------------------------------------- diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/dataflow/BigQueryIOTranslator.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/dataflow/BigQueryIOTranslator.java index 51d7000..b0952a6 100755 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/dataflow/BigQueryIOTranslator.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/sdk/runners/dataflow/BigQueryIOTranslator.java @@ -17,26 +17,19 @@ */ package org.apache.beam.sdk.runners.dataflow; -import org.apache.beam.sdk.coders.TableRowJsonCoder; import org.apache.beam.sdk.io.BigQueryIO; import org.apache.beam.sdk.runners.DataflowPipelineTranslator; import org.apache.beam.sdk.util.PropertyNames; -import org.apache.beam.sdk.util.Transport; -import org.apache.beam.sdk.util.WindowedValue; -import com.google.api.client.json.JsonFactory; import com.google.api.services.bigquery.model.TableReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; - /** * BigQuery transform support code for the Dataflow backend. */ public class BigQueryIOTranslator { - private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory(); private static final Logger LOG = LoggerFactory.getLogger(BigQueryIOTranslator.class); /** @@ -76,52 +69,4 @@ public class BigQueryIOTranslator { context.addValueOnlyOutput(PropertyNames.OUTPUT, context.getOutput(transform)); } } - - /** - * Implements BigQueryIO Write translation for the Dataflow backend. - */ - public static class WriteTranslator - implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Write.Bound> { - - @Override - public void translate(BigQueryIO.Write.Bound transform, - DataflowPipelineTranslator.TranslationContext context) { - if (context.getPipelineOptions().isStreaming()) { - // Streaming is handled by the streaming runner. - throw new AssertionError( - "BigQueryIO is specified to use streaming write in batch mode."); - } - - TableReference table = transform.getTable(); - - // Actual translation. - context.addStep(transform, "ParallelWrite"); - context.addInput(PropertyNames.FORMAT, "bigquery"); - context.addInput(PropertyNames.BIGQUERY_TABLE, - table.getTableId()); - context.addInput(PropertyNames.BIGQUERY_DATASET, - table.getDatasetId()); - if (table.getProjectId() != null) { - context.addInput(PropertyNames.BIGQUERY_PROJECT, table.getProjectId()); - } - if (transform.getSchema() != null) { - try { - context.addInput(PropertyNames.BIGQUERY_SCHEMA, - JSON_FACTORY.toString(transform.getSchema())); - } catch (IOException exn) { - throw new IllegalArgumentException("Invalid table schema.", exn); - } - } - context.addInput( - PropertyNames.BIGQUERY_CREATE_DISPOSITION, - transform.getCreateDisposition().name()); - context.addInput( - PropertyNames.BIGQUERY_WRITE_DISPOSITION, - transform.getWriteDisposition().name()); - // Set sink encoding to TableRowJsonCoder. - context.addEncodingInput( - WindowedValue.getValueOnlyCoder(TableRowJsonCoder.of())); - context.addInput(PropertyNames.PARALLEL_INPUT, context.getInput(transform)); - } - } }