This is an automated email from the ASF dual-hosted git repository.

jkff pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 540ce4e  [BEAM-2870] Strips partition decorators when 
creating/patching tables in batch
     new ead5d43  This closes #4177: [BEAM-2870] Strips partition decorators 
when creating/patching tables in batch
540ce4e is described below

commit 540ce4ee87973cf82e2246ffd5d686055203069e
Author: Eugene Kirpichov <kirpic...@google.com>
AuthorDate: Mon Nov 27 10:28:02 2017 -0800

    [BEAM-2870] Strips partition decorators when creating/patching tables in 
batch
---
 .../apache/beam/sdk/io/gcp/bigquery/WriteTables.java  |  4 +++-
 .../beam/sdk/io/gcp/bigquery/BigQueryIOTest.java      | 12 +++++++++---
 .../beam/sdk/io/gcp/bigquery/FakeDatasetService.java  | 19 ++++++++++++++-----
 .../beam/sdk/io/gcp/bigquery/FakeJobService.java      |  9 ++++++++-
 4 files changed, 34 insertions(+), 10 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
index 9ca253c..7077651 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteTables.java
@@ -264,7 +264,9 @@ class WriteTables<DestinationT>
       switch (jobStatus) {
         case SUCCEEDED:
           if (tableDescription != null) {
-            datasetService.patchTableDescription(ref, tableDescription);
+            datasetService.patchTableDescription(
+                
ref.clone().setTableId(BigQueryHelpers.stripPartitionDecorator(ref.getTableId())),
+                tableDescription);
           }
           return;
         case UNKNOWN:
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
index 08f7464..31c1781 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java
@@ -589,7 +589,12 @@ public class BigQueryIOTest implements Serializable {
 
     if (streaming) {
       users = users.setIsBoundedInternal(PCollection.IsBounded.UNBOUNDED);
+
     }
+
+    // Use a partition decorator to verify that partition decorators are 
supported.
+    final String partitionDecorator = "20171127";
+
     users.apply(
         "WriteBigQuery",
         BigQueryIO.<String>write()
@@ -630,7 +635,8 @@ public class BigQueryIOTest implements Serializable {
                     verifySideInputs();
                     // Each user in it's own table.
                     return new TableDestination(
-                        "dataset-id.userid-" + userId, "table for userid " + 
userId);
+                        "dataset-id.userid-" + userId + "$" + 
partitionDecorator,
+                        "table for userid " + userId);
                   }
 
                   @Override
@@ -2528,7 +2534,7 @@ public class BigQueryIOTest implements Serializable {
     p.apply(Create.of(row1, row2))
         .apply(
             BigQueryIO.writeTableRows()
-                .to("project-id:dataset-id.table-id$decorator")
+                .to("project-id:dataset-id.table-id$20171127")
                 .withTestServices(fakeBqServices)
                 .withMethod(Method.STREAMING_INSERTS)
                 .withSchema(schema)
@@ -2539,7 +2545,7 @@ public class BigQueryIOTest implements Serializable {
   @Test
   public void testTableDecoratorStripping() {
     assertEquals("project:dataset.table",
-        
BigQueryHelpers.stripPartitionDecorator("project:dataset.table$decorator"));
+        
BigQueryHelpers.stripPartitionDecorator("project:dataset.table$20171127"));
     assertEquals("project:dataset.table",
         BigQueryHelpers.stripPartitionDecorator("project:dataset.table"));
   }
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
index ffab123..9609ada 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeDatasetService.java
@@ -96,6 +96,7 @@ class FakeDatasetService implements DatasetService, 
Serializable {
 
   @Override
   public void deleteTable(TableReference tableRef) throws IOException, 
InterruptedException {
+    validateWholeTableReference(tableRef);
     synchronized (BigQueryIOTest.tables) {
       Map<String, TableContainer> dataset =
           BigQueryIOTest.tables.get(tableRef.getProjectId(), 
tableRef.getDatasetId());
@@ -109,12 +110,13 @@ class FakeDatasetService implements DatasetService, 
Serializable {
     }
   }
 
-
-  @Override
-  public void createTable(Table table) throws IOException {
+  /**
+   * Validates a table reference for whole-table operations, such as 
create/delete/patch. Such
+   * operations do not support partition decorators.
+   */
+  private static void validateWholeTableReference(TableReference 
tableReference)
+      throws IOException {
     final Pattern tableRegexp = Pattern.compile("[-\\w]{1,1024}");
-
-    TableReference tableReference = table.getTableReference();
     if (!tableRegexp.matcher(tableReference.getTableId()).matches()) {
       throw new IOException(
           String.format(
@@ -123,6 +125,12 @@ class FakeDatasetService implements DatasetService, 
Serializable {
                   + " decorators cannot be used.",
               tableReference.getTableId()));
     }
+  }
+
+  @Override
+  public void createTable(Table table) throws IOException {
+    TableReference tableReference = table.getTableReference();
+    validateWholeTableReference(tableReference);
     synchronized (BigQueryIOTest.tables) {
       Map<String, TableContainer> dataset =
           BigQueryIOTest.tables.get(tableReference.getProjectId(), 
tableReference.getDatasetId());
@@ -245,6 +253,7 @@ class FakeDatasetService implements DatasetService, 
Serializable {
   public Table patchTableDescription(TableReference tableReference,
                                      @Nullable String tableDescription)
       throws IOException, InterruptedException {
+    validateWholeTableReference(tableReference);
     synchronized (BigQueryIOTest.tables) {
       TableContainer tableContainer = 
getTableContainer(tableReference.getProjectId(),
           tableReference.getDatasetId(), tableReference.getTableId());
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
index edf2a55..fcf464f 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/FakeJobService.java
@@ -312,7 +312,14 @@ class FakeJobService implements JobService, Serializable {
       return new JobStatus().setState("FAILED").setErrorResult(new 
ErrorProto());
     }
     if (existingTable == null) {
-      existingTable = new 
Table().setTableReference(destination).setSchema(schema);
+      TableReference strippedDestination =
+          destination
+              .clone()
+              
.setTableId(BigQueryHelpers.stripPartitionDecorator(destination.getTableId()));
+      existingTable =
+          new Table()
+              .setTableReference(strippedDestination)
+              .setSchema(schema);
       if (load.getTimePartitioning() != null) {
         existingTable = 
existingTable.setTimePartitioning(load.getTimePartitioning());
       }

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <commits@beam.apache.org>'].

Reply via email to