[ 
https://issues.apache.org/jira/browse/BEAM-5191?focusedWorklogId=267724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-267724
 ]

ASF GitHub Bot logged work on BEAM-5191:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 26/Jun/19 15:20
            Start Date: 26/Jun/19 15:20
    Worklog Time Spent: 10m 
      Work Description: jklukas commented on pull request #8945: [BEAM-5191] 
Support for BigQuery clustering
URL: https://github.com/apache/beam/pull/8945#discussion_r297726367
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java
 ##########
 @@ -391,6 +392,93 @@ public void testTimePartitioning(BigQueryIO.Write.Method 
insertMethod) throws Ex
     assertEquals(timePartitioning, table.getTimePartitioning());
   }
 
+  @Test
+  public void testClusteringStreamingInserts() throws Exception {
+    testClustering(BigQueryIO.Write.Method.STREAMING_INSERTS);
+  }
+
+  @Test
+  public void testClusteringBatchLoads() throws Exception {
+    testClustering(BigQueryIO.Write.Method.FILE_LOADS);
+  }
+
+  public void testClustering(BigQueryIO.Write.Method insertMethod) throws 
Exception {
+    TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", 
"1");
+    TableRow row2 = new TableRow().set("date", "2018-01-02").set("number", 
"2");
+
+    TimePartitioning timePartitioning = new 
TimePartitioning().setType("DAY").setField("date");
+    Clustering clustering = new 
Clustering().setFields(ImmutableList.of("date"));
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema()
+                        .setName("date")
+                        .setType("DATE")
+                        .setName("number")
+                        .setType("INTEGER")));
+    p.apply(Create.of(row1, row2))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to("project-id:dataset-id.table-id")
+                .withTestServices(fakeBqServices)
+                .withMethod(insertMethod)
+                .withSchema(schema)
+                .withTimePartitioning(timePartitioning)
+                .withClustering(clustering)
+                .withoutValidation());
+    p.run();
+    Table table =
+        fakeDatasetService.getTable(
+            BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id"));
+    assertEquals(schema, table.getSchema());
+    assertEquals(timePartitioning, table.getTimePartitioning());
+    assertEquals(clustering, table.getClustering());
+  }
+
+  @Test
+  public void testClusteringTableFunction() throws Exception {
+    TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", 
"1");
+    TableRow row2 = new TableRow().set("date", "2018-01-02").set("number", 
"2");
+
+    TimePartitioning timePartitioning = new 
TimePartitioning().setType("DAY").setField("date");
+    Clustering clustering = new 
Clustering().setFields(ImmutableList.of("date"));
+    TableSchema schema =
+        new TableSchema()
+            .setFields(
+                ImmutableList.of(
+                    new TableFieldSchema()
+                        .setName("date")
+                        .setType("DATE")
+                        .setName("number")
+                        .setType("INTEGER")));
+    p.apply(Create.of(row1, row2))
+        .apply(
+            BigQueryIO.writeTableRows()
+                .to(
+                    (ValueInSingleWindow<TableRow> vsw) -> {
+                      String tableSpec =
+                          "project-id:dataset-id.table-" + 
vsw.getValue().get("number");
+                      return new TableDestination(
+                          tableSpec,
+                          null,
+                          new 
TimePartitioning().setType("DAY").setField("date"),
+                          new 
Clustering().setFields(ImmutableList.of("date")));
+                    })
+                .withTestServices(fakeBqServices)
+                .withMethod(BigQueryIO.Write.Method.FILE_LOADS)
+                .withSchema(schema)
+                .enableClustering()
 
 Review comment:
   Notably, this test fails if `enableClustering()` is not called because the 
default `TableDestinationCoderV2` is used and clustering information is dropped 
before the table is created. This is exactly the behavior we want for backwards 
compatibility.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 267724)
    Time Spent: 8h 20m  (was: 8h 10m)

> Add support for writing to BigQuery clustered tables
> ----------------------------------------------------
>
>                 Key: BEAM-5191
>                 URL: https://issues.apache.org/jira/browse/BEAM-5191
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-gcp
>    Affects Versions: 2.6.0
>            Reporter: Robert Sahlin
>            Assignee: Wout Scheepers
>            Priority: Minor
>              Labels: features, newbie
>          Time Spent: 8h 20m
>  Remaining Estimate: 0h
>
> Google recently added support for clustered tables in BigQuery. It would be 
> useful to set clustering columns the same way as for partitioning. It should 
> support multiple fields (4) for clustering.
> For example:
> [BigQueryIO.Write|https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html]<[T|https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html]>
>  .withClustering(new Clustering().setField("productId").setType("STRING"))



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to