[
https://issues.apache.org/jira/browse/BEAM-13945?focusedWorklogId=756018&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756018
]
ASF GitHub Bot logged work on BEAM-13945:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 12/Apr/22 20:31
Start Date: 12/Apr/22 20:31
Worklog Time Spent: 10m
Work Description: chamikaramj commented on code in PR #17209:
URL: https://github.com/apache/beam/pull/17209#discussion_r848844479
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Rule
+ public transient TestPipeline p_write = TestPipeline.create();
+
+ private BigQueryIOJSONOptions options;
+
+ private static String project;
+
+ private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+ private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+ private static String JSON_TABLE_DESTINATION;
+
+ private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(ImmutableList.of(
+ new TableFieldSchema().setName("country_code").setType("STRING"),
+ new TableFieldSchema().setName("country").setType("JSON")
+ ));
+
+ public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+ + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+ private static final Map<String, String> JSON_TYPE_DATA =
generateCountryData(false);
+
+ // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+ static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String,
String>> {
+ @ProcessElement
+ public void processElement(@Element TableRow row,
OutputReceiver<KV<String, String>> out){
+ String country_code = row.get("country_code").toString();
+ String country = row.get("country").toString();
+
+ out.output(KV.of(country_code, country));
+ }
+ }
+
+ // Compare PCollection input with expected results.
+ static class CompareJSON implements SerializableFunction<Iterable<KV<String,
String>>, Void> {
+ Map<String, String> expected;
+ public CompareJSON(Map<String, String> expected){
+ this.expected = expected;
+ }
+
+ @Override
+ public Void apply(Iterable<KV<String, String>> input) throws
RuntimeException {
+ int counter = 0;
+
+ // Iterate through input list and convert each String to JsonElement
+ // Compare with expected result JsonElements
+ for(KV<String, String> actual: input){
+ String key = actual.getKey();
+
+ if(!expected.containsKey(key)){
+ throw new NoSuchElementException(String.format(
+ "Unexpected key '%s' found in input but does not exist in
expected results.", key));
+ }
+ String jsonStringActual = actual.getValue();
+ JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+ String jsonStringExpected = expected.get(key);
+ JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+ assertEquals(jsonExpected, jsonActual);
+ counter += 1;
+ }
+ if(counter != expected.size()){
+ throw new RuntimeException(String.format(
+ "Expected %d elements but got %d elements.", expected.size(),
counter));
+ }
+ return null;
+ }
+ }
+
+ public void runTestWrite(BigQueryIOJSONOptions options){
+ List<TableRow> rowsToWrite = new ArrayList<>();
+ for(Map.Entry<String, String> element: JSON_TYPE_DATA.entrySet()){
+ rowsToWrite.add(new TableRow()
+ .set("country_code", element.getKey())
+ .set("country", element.getValue()));
+ }
+
+ p_write
+ .apply("Create Elements", Create.of(rowsToWrite))
+ .apply("Write To BigQuery",
+ BigQueryIO.writeTableRows()
+ .to(options.getOutput())
+ .withSchema(JSON_TYPE_TABLE_SCHEMA)
+ .withCreateDisposition(options.getCreateDisposition())
+ .withMethod(options.getWriteMethod()));
+ p_write.run().waitUntilFinish();
+
+ options.setReadMethod(TypedRead.Method.EXPORT);
+ readAndValidateRows(options, JSON_TYPE_DATA);
+ }
+
+ // reads TableRows from BigQuery and validates JSON Strings
+ // expectedJsonResults Strings must be in valid json format
+ public void readAndValidateRows(BigQueryIOJSONOptions options, Map<String,
String> expectedResults){
+ TypedRead<TableRow> bigqueryIO =
+ BigQueryIO.readTableRows().withMethod(options.getReadMethod());
+
+ // read from input query or from table
+ if(!options.getQuery().isEmpty()) {
+ bigqueryIO = bigqueryIO.fromQuery(options.getQuery()).usingStandardSql();
+ } else {
+ bigqueryIO = bigqueryIO.from(options.getInput());
+ }
+
+ PCollection<KV<String, String>> jsonKVPairs = p
+ .apply("Read rows", bigqueryIO)
+ .apply("Convert to KV JSON Strings", ParDo.of(new
TableRowToJSONStringFn()));
+
+ PAssert.that(jsonKVPairs).satisfies(new CompareJSON(expectedResults));
+
+ p.run().waitUntilFinish();
+ }
+
+ @Test
+ public void testDirectRead() throws Exception {
+ LOG.info("Testing DIRECT_READ read method with JSON data");
+ options =
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+ options.setReadMethod(TypedRead.Method.DIRECT_READ);
+ options.setInput(JSON_TABLE_DESTINATION);
+
+ readAndValidateRows(options, JSON_TYPE_DATA);
+ }
+
+ @Test
+ public void testExportRead() throws Exception {
+ LOG.info("Testing EXPORT read method with JSON data");
+ options =
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+ options.setReadMethod(TypedRead.Method.EXPORT);
+ options.setInput(JSON_TABLE_DESTINATION);
+
+ readAndValidateRows(options, JSON_TYPE_DATA);
+ }
+
+ @Test
+ public void testQueryRead() throws Exception {
+ LOG.info("Testing querying JSON data with DIRECT_READ read method");
+
+ options =
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+ options.setReadMethod(TypedRead.Method.DIRECT_READ);
+ options.setQuery(
+ String.format("SELECT country_code, country.cities AS country FROM "
+ + "`%s.%s.%s`", project, DATASET_ID, JSON_TYPE_TABLE_NAME));
+
+ // get nested json objects from static data
+ Map<String, String> expected = generateCountryData(true);
+
+ readAndValidateRows(options, expected);
+ }
+
+ @Test
+ public void testStorageWrite() throws Exception{
+ LOG.info("Testing writing JSON data with Storage API");
+
+ options =
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+ options.setWriteMethod(Write.Method.STORAGE_WRITE_API);
+
+ String storage_destination = String.format("%s:%s.%s", project,
DATASET_ID, STORAGE_WRITE_TEST_TABLE);
+ options.setOutput(storage_destination);
+ options.setInput(storage_destination);
+
+ runTestWrite(options);
+ }
+
+ @Test
+ public void testLegacyStreamingWrite() throws Exception{
Review Comment:
Should we add this to Java docs ?
Issue Time Tracking
-------------------
Worklog Id: (was: 756018)
Time Spent: 3h 50m (was: 3h 40m)
> Update BQ connector to support new JSON type
> --------------------------------------------
>
> Key: BEAM-13945
> URL: https://issues.apache.org/jira/browse/BEAM-13945
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Chamikara Madhusanka Jayalath
> Assignee: Ahmed Abualsaud
> Priority: P2
> Time Spent: 3h 50m
> Remaining Estimate: 0h
>
> BQ has a new JSON type that is defined here:
> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
> We should update Beam BQ Java and Python connectors to support that for
> various read methods (export jobs, storage API) and write methods (load jobs,
> streaming inserts, storage API).
> We should also add integration tests that exercise reading from /writing to
> BQ tables with columns that has JSON type.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)