[ https://issues.apache.org/jira/browse/BEAM-5191?focusedWorklogId=267724&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-267724 ]
ASF GitHub Bot logged work on BEAM-5191: ---------------------------------------- Author: ASF GitHub Bot Created on: 26/Jun/19 15:20 Start Date: 26/Jun/19 15:20 Worklog Time Spent: 10m Work Description: jklukas commented on pull request #8945: [BEAM-5191] Support for BigQuery clustering URL: https://github.com/apache/beam/pull/8945#discussion_r297726367 ########## File path: sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java ########## @@ -391,6 +392,93 @@ public void testTimePartitioning(BigQueryIO.Write.Method insertMethod) throws Ex assertEquals(timePartitioning, table.getTimePartitioning()); } + @Test + public void testClusteringStreamingInserts() throws Exception { + testClustering(BigQueryIO.Write.Method.STREAMING_INSERTS); + } + + @Test + public void testClusteringBatchLoads() throws Exception { + testClustering(BigQueryIO.Write.Method.FILE_LOADS); + } + + public void testClustering(BigQueryIO.Write.Method insertMethod) throws Exception { + TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", "1"); + TableRow row2 = new TableRow().set("date", "2018-01-02").set("number", "2"); + + TimePartitioning timePartitioning = new TimePartitioning().setType("DAY").setField("date"); + Clustering clustering = new Clustering().setFields(ImmutableList.of("date")); + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("date") + .setType("DATE") + .setName("number") + .setType("INTEGER"))); + p.apply(Create.of(row1, row2)) + .apply( + BigQueryIO.writeTableRows() + .to("project-id:dataset-id.table-id") + .withTestServices(fakeBqServices) + .withMethod(insertMethod) + .withSchema(schema) + .withTimePartitioning(timePartitioning) + .withClustering(clustering) + .withoutValidation()); + p.run(); + Table table = + fakeDatasetService.getTable( + BigQueryHelpers.parseTableSpec("project-id:dataset-id.table-id")); + assertEquals(schema, table.getSchema()); + assertEquals(timePartitioning, table.getTimePartitioning()); + assertEquals(clustering, table.getClustering()); + } + + @Test + public void testClusteringTableFunction() throws Exception { + TableRow row1 = new TableRow().set("date", "2018-01-01").set("number", "1"); + TableRow row2 = new TableRow().set("date", "2018-01-02").set("number", "2"); + + TimePartitioning timePartitioning = new TimePartitioning().setType("DAY").setField("date"); + Clustering clustering = new Clustering().setFields(ImmutableList.of("date")); + TableSchema schema = + new TableSchema() + .setFields( + ImmutableList.of( + new TableFieldSchema() + .setName("date") + .setType("DATE") + .setName("number") + .setType("INTEGER"))); + p.apply(Create.of(row1, row2)) + .apply( + BigQueryIO.writeTableRows() + .to( + (ValueInSingleWindow<TableRow> vsw) -> { + String tableSpec = + "project-id:dataset-id.table-" + vsw.getValue().get("number"); + return new TableDestination( + tableSpec, + null, + new TimePartitioning().setType("DAY").setField("date"), + new Clustering().setFields(ImmutableList.of("date"))); + }) + .withTestServices(fakeBqServices) + .withMethod(BigQueryIO.Write.Method.FILE_LOADS) + .withSchema(schema) + .enableClustering() Review comment: Notably, this test fails if `enableClustering()` is not called because the default `TableDestinationCoderV2` is used and clustering information is dropped before the table is created. This is exactly the behavior we want for backwards compatibility. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 267724) Time Spent: 8h 20m (was: 8h 10m) > Add support for writing to BigQuery clustered tables > ---------------------------------------------------- > > Key: BEAM-5191 > URL: https://issues.apache.org/jira/browse/BEAM-5191 > Project: Beam > Issue Type: Improvement > Components: io-java-gcp > Affects Versions: 2.6.0 > Reporter: Robert Sahlin > Assignee: Wout Scheepers > Priority: Minor > Labels: features, newbie > Time Spent: 8h 20m > Remaining Estimate: 0h > > Google recently added support for clustered tables in BigQuery. It would be > useful to set clustering columns the same way as for partitioning. It should > support multiple fields (4) for clustering. > For example: > [BigQueryIO.Write|https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html]<[T|https://beam.apache.org/documentation/sdks/javadoc/2.6.0/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.Write.html]> > .withClustering(new Clustering().setField("productId").setType("STRING")) -- This message was sent by Atlassian JIRA (v7.6.3#76005)