[
https://issues.apache.org/jira/browse/BEAM-13990?focusedWorklogId=763931&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-763931
]
ASF GitHub Bot logged work on BEAM-13990:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 28/Apr/22 23:31
Start Date: 28/Apr/22 23:31
Worklog Time Spent: 10m
Work Description: yirutang commented on code in PR #17404:
URL: https://github.com/apache/beam/pull/17404#discussion_r861374699
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -84,10 +143,10 @@ public static class SchemaDoesntMatchException extends
SchemaConversionException
.put("NUMERIC", Type.TYPE_STRING) // Pass through the JSON encoding.
.put("BIGNUMERIC", Type.TYPE_STRING) // Pass through the JSON
encoding.
.put("GEOGRAPHY", Type.TYPE_STRING) // Pass through the JSON
encoding.
- .put("DATE", Type.TYPE_STRING) // Pass through the JSON encoding.
+ .put("DATE", Type.TYPE_INT32)
.put("TIME", Type.TYPE_STRING) // Pass through the JSON encoding.
.put("DATETIME", Type.TYPE_STRING) // Pass through the JSON encoding.
Review Comment:
TIME and DATETIME can be Int64 so that the encoding size is smaller.
Shouldn't block this PR:
https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessage.java#L334
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+ private static final BigqueryClient BQ_CLIENT = new
BigqueryClient("TableRowToStorageApiProtoIT");
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ private static final String BIG_QUERY_DATASET_ID =
+ "table_row_to_storage_api_proto_" + System.nanoTime();
+
+ private static final TableSchema BASE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(new
TableFieldSchema().setType("STRING").setName("stringValue"))
+ .add(new
TableFieldSchema().setType("BYTES").setName("bytesValue"))
+ .add(new
TableFieldSchema().setType("INT64").setName("int64Value"))
+ .add(new
TableFieldSchema().setType("INTEGER").setName("intValue"))
+ .add(new
TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+ .add(new
TableFieldSchema().setType("FLOAT").setName("floatValue"))
+ .add(new
TableFieldSchema().setType("BOOL").setName("boolValue"))
+ .add(new
TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+ .add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+ .add(new
TableFieldSchema().setType("TIME").setName("timeValue"))
+ .add(new
TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+ .add(new
TableFieldSchema().setType("DATE").setName("dateValue"))
+ .add(new
TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+ .add(
+ new TableFieldSchema()
+ .setType("STRING")
+ .setMode("REPEATED")
+ .setName("arrayValue"))
+ .build());
+
+ private static final TableRow BASE_TABLE_ROW =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue",
BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", "2.8168")
+ .set("floatValue", "2.817")
+ .set("boolValue", "true")
+ .set("booleanValue", "true")
+ .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set(
+ "arrayValue",
+ Arrays.asList("hello", "goodbye", null)); // null in arrayValue
should be removed
+
+ private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue",
org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z"))
+ .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+ .set("datetimeValue",
org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+ .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+ .set("numericValue", new BigDecimal("23.4"))
+ .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+ private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+ .set("timeValue", LocalTime.parse("00:52:07.123456"))
+ .set("datetimeValue",
LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+ .set("dateValue", LocalDate.parse("2019-08-16"))
+ .set("numericValue", new BigDecimal("23.4"))
Review Comment:
int and double timestamp value test?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProtoIT.java:
##########
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import com.google.api.services.bigquery.model.Table;
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableReference;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunctions;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.BaseEncoding;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(JUnit4.class)
+@SuppressWarnings({
+ "nullness" // TODO(https://issues.apache.org/jira/browse/BEAM-10402)
+})
+/** Unit tests for {@link TableRowToStorageApiProto}. */
+public class TableRowToStorageApiProtoIT {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(TableRowToStorageApiProtoIT.class);
+ private static final BigqueryClient BQ_CLIENT = new
BigqueryClient("TableRowToStorageApiProtoIT");
+ private static final String PROJECT =
+ TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+ private static final String BIG_QUERY_DATASET_ID =
+ "table_row_to_storage_api_proto_" + System.nanoTime();
+
+ private static final TableSchema BASE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(
+ ImmutableList.<TableFieldSchema>builder()
+ .add(new
TableFieldSchema().setType("STRING").setName("stringValue"))
+ .add(new
TableFieldSchema().setType("BYTES").setName("bytesValue"))
+ .add(new
TableFieldSchema().setType("INT64").setName("int64Value"))
+ .add(new
TableFieldSchema().setType("INTEGER").setName("intValue"))
+ .add(new
TableFieldSchema().setType("FLOAT64").setName("float64Value"))
+ .add(new
TableFieldSchema().setType("FLOAT").setName("floatValue"))
+ .add(new
TableFieldSchema().setType("BOOL").setName("boolValue"))
+ .add(new
TableFieldSchema().setType("BOOLEAN").setName("booleanValue"))
+ .add(new
TableFieldSchema().setType("TIMESTAMP").setName("timestampValue"))
+ .add(new
TableFieldSchema().setType("TIME").setName("timeValue"))
+ .add(new
TableFieldSchema().setType("DATETIME").setName("datetimeValue"))
+ .add(new
TableFieldSchema().setType("DATE").setName("dateValue"))
+ .add(new
TableFieldSchema().setType("NUMERIC").setName("numericValue"))
+ .add(
+ new TableFieldSchema()
+ .setType("STRING")
+ .setMode("REPEATED")
+ .setName("arrayValue"))
+ .build());
+
+ private static final TableRow BASE_TABLE_ROW =
+ new TableRow()
+ .set("stringValue", "string")
+ .set(
+ "bytesValue",
BaseEncoding.base64().encode("string".getBytes(StandardCharsets.UTF_8)))
+ .set("int64Value", "42")
+ .set("intValue", "43")
+ .set("float64Value", "2.8168")
+ .set("floatValue", "2.817")
+ .set("boolValue", "true")
+ .set("booleanValue", "true")
+ .set("timestampValue", "1970-01-01T00:00:00.000043Z")
+ .set("timeValue", "00:52:07.123456")
+ .set("datetimeValue", "2019-08-16T00:52:07.123456")
+ .set("dateValue", "2019-08-16")
+ .set("numericValue", "23.4")
+ .set(
+ "arrayValue",
+ Arrays.asList("hello", "goodbye", null)); // null in arrayValue
should be removed
+
+ private static final TableRow BASE_TABLE_ROW_JODA_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue",
org.joda.time.Instant.parse("1970-01-01T00:00:00.000043Z"))
+ .set("timeValue", org.joda.time.LocalTime.parse("00:52:07.123456"))
+ .set("datetimeValue",
org.joda.time.LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+ .set("dateValue", org.joda.time.LocalDate.parse("2019-08-16"))
+ .set("numericValue", new BigDecimal("23.4"))
+ .set("arrayValue", ImmutableList.of("hello", "goodbye"));
+
+ private static final TableRow BASE_TABLE_ROW_JAVA_TIME =
+ new TableRow()
+ .set("stringValue", "string")
+ .set("bytesValue", "string".getBytes(StandardCharsets.UTF_8))
+ .set("int64Value", 42)
+ .set("intValue", 43)
+ .set("float64Value", 2.8168f)
+ .set("floatValue", 2.817f)
+ .set("boolValue", true)
+ .set("booleanValue", true)
+ .set("timestampValue", Instant.parse("1970-01-01T00:00:00.000043Z"))
+ .set("timeValue", LocalTime.parse("00:52:07.123456"))
+ .set("datetimeValue",
LocalDateTime.parse("2019-08-16T00:52:07.123456"))
+ .set("dateValue", LocalDate.parse("2019-08-16"))
+ .set("numericValue", new BigDecimal("23.4"))
Review Comment:
bignumeric test?
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -523,14 +523,6 @@ public void testTimePartitioning() throws Exception {
testTimePartitioning(method);
}
- @Test
- public void testTimePartitioningStorageApi() throws Exception {
Review Comment:
Why remove this?
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -218,125 +288,157 @@ private static void fieldDescriptorFromTableField(
}
@Nullable
+ @SuppressWarnings({"nullness"})
private static Object messageValueFromFieldValue(
- FieldDescriptor fieldDescriptor, @Nullable Object bqValue, boolean
ignoreUnknownValues)
+ SchemaInformation schemaInformation,
+ FieldDescriptor fieldDescriptor,
+ @Nullable Object bqValue,
+ boolean ignoreUnknownValues)
throws SchemaConversionException {
if (bqValue == null) {
if (fieldDescriptor.isOptional()) {
return null;
} else if (fieldDescriptor.isRepeated()) {
return Collections.emptyList();
- }
- {
+ } else {
throw new IllegalArgumentException(
"Received null value for non-nullable field " +
fieldDescriptor.getName());
}
}
- return toProtoValue(
- fieldDescriptor, bqValue, fieldDescriptor.isRepeated(),
ignoreUnknownValues);
- }
-
- private static final Map<FieldDescriptor.Type, Function<String, Object>>
- JSON_PROTO_STRING_PARSERS =
- ImmutableMap.<FieldDescriptor.Type, Function<String,
Object>>builder()
- .put(FieldDescriptor.Type.INT32, Integer::valueOf)
- .put(FieldDescriptor.Type.INT64, Long::valueOf)
- .put(FieldDescriptor.Type.FLOAT, Float::valueOf)
- .put(FieldDescriptor.Type.DOUBLE, Double::valueOf)
- .put(FieldDescriptor.Type.BOOL, Boolean::valueOf)
- .put(FieldDescriptor.Type.STRING, str -> str)
- .put(
- FieldDescriptor.Type.BYTES,
- b64 ->
ByteString.copyFrom(BaseEncoding.base64().decode(b64)))
- .build();
- @Nullable
- @SuppressWarnings({"nullness"})
- @VisibleForTesting
- static Object toProtoValue(
- FieldDescriptor fieldDescriptor,
- Object jsonBQValue,
- boolean isRepeated,
- boolean ignoreUnknownValues)
- throws SchemaConversionException {
- if (isRepeated) {
- List<Object> listValue = (List<Object>) jsonBQValue;
+ if (fieldDescriptor.isRepeated()) {
+ List<Object> listValue = (List<Object>) bqValue;
List<Object> protoList =
Lists.newArrayListWithCapacity(listValue.size());
- for (Object o : listValue) {
- protoList.add(toProtoValue(fieldDescriptor, o, false,
ignoreUnknownValues));
+ for (@Nullable Object o : listValue) {
+ if (o != null) { // repeated field cannot contain null.
+ protoList.add(
+ singularFieldToProtoValue(
+ schemaInformation, fieldDescriptor, o, ignoreUnknownValues));
+ }
}
return protoList;
}
-
- if (fieldDescriptor.getType() == FieldDescriptor.Type.MESSAGE) {
- if (jsonBQValue instanceof TableRow) {
- TableRow tableRow = (TableRow) jsonBQValue;
- return messageFromTableRow(fieldDescriptor.getMessageType(), tableRow,
ignoreUnknownValues);
- } else if (jsonBQValue instanceof AbstractMap) {
- // This will handle nested rows.
- AbstractMap<String, Object> map = ((AbstractMap<String, Object>)
jsonBQValue);
- return messageFromMap(fieldDescriptor.getMessageType(), map,
ignoreUnknownValues);
- } else {
- throw new RuntimeException("Unexpected value " + jsonBQValue + "
Expected a JSON map.");
- }
- }
- @Nullable Object scalarValue = scalarToProtoValue(fieldDescriptor,
jsonBQValue);
- if (scalarValue == null) {
- return toProtoValue(fieldDescriptor, jsonBQValue.toString(), isRepeated,
ignoreUnknownValues);
- } else {
- return scalarValue;
- }
+ return singularFieldToProtoValue(
+ schemaInformation, fieldDescriptor, bqValue, ignoreUnknownValues);
}
@VisibleForTesting
@Nullable
- static Object scalarToProtoValue(FieldDescriptor fieldDescriptor, Object
jsonBQValue) {
- if (jsonBQValue instanceof String) {
- Function<String, Object> mapper =
JSON_PROTO_STRING_PARSERS.get(fieldDescriptor.getType());
- if (mapper == null) {
- throw new UnsupportedOperationException(
- "Converting BigQuery type '"
- + jsonBQValue.getClass()
- + "' to '"
- + fieldDescriptor
- + "' is not supported");
- }
- return mapper.apply((String) jsonBQValue);
- }
-
- switch (fieldDescriptor.getType()) {
- case BOOL:
- if (jsonBQValue instanceof Boolean) {
- return jsonBQValue;
+ static Object singularFieldToProtoValue(
+ SchemaInformation schemaInformation,
+ FieldDescriptor fieldDescriptor,
+ Object value,
+ boolean ignoreUnknownValues)
+ throws SchemaConversionException {
+ switch (schemaInformation.getType()) {
+ case "INT64":
+ case "INTEGER":
+ if (value instanceof String) {
+ return Long.valueOf((String) value);
+ } else if (value instanceof Integer || value instanceof Long) {
+ return ((Number) value).longValue();
}
break;
- case BYTES:
+ case "FLOAT64":
+ case "FLOAT":
+ if (value instanceof String) {
+ return Double.valueOf((String) value);
+ } else if (value instanceof Double || value instanceof Float) {
+ return ((Number) value).doubleValue();
+ }
break;
- case INT64:
- if (jsonBQValue instanceof Integer) {
- return Long.valueOf((Integer) jsonBQValue);
- } else if (jsonBQValue instanceof Long) {
- return jsonBQValue;
+ case "BOOLEAN":
+ case "BOOL":
+ if (value instanceof String) {
+ return Boolean.valueOf((String) value);
+ } else if (value instanceof Boolean) {
+ return value;
}
break;
- case INT32:
- if (jsonBQValue instanceof Integer) {
- return jsonBQValue;
+ case "BYTES":
+ if (value instanceof String) {
+ return ByteString.copyFrom(BaseEncoding.base64().decode((String)
value));
+ } else if (value instanceof byte[]) {
+ return ByteString.copyFrom((byte[]) value);
+ } else if (value instanceof ByteString) {
+ return value;
}
break;
- case STRING:
+ case "TIMESTAMP":
+ if (value instanceof String) {
+ try {
+ return ChronoUnit.MICROS.between(Instant.EPOCH,
Instant.parse((String) value));
+ } catch (DateTimeParseException e) {
+ return ChronoUnit.MICROS.between(
+ Instant.EPOCH, Instant.ofEpochMilli(Long.parseLong((String)
value)));
+ }
+ } else if (value instanceof Instant) {
+ return ChronoUnit.MICROS.between(Instant.EPOCH, (Instant) value);
+ } else if (value instanceof org.joda.time.Instant) {
+ // joda instant precision is millisecond
+ return ((org.joda.time.Instant) value).getMillis() * 1000L;
+ } else if (value instanceof Integer || value instanceof Long) {
+ return ((Number) value).longValue();
+ } else if (value instanceof Double || value instanceof Float) {
+ // assume value represents number of seconds since epoch
+ return BigDecimal.valueOf(((Number) value).doubleValue())
+ .scaleByPowerOfTen(6)
+ .setScale(0, RoundingMode.HALF_UP)
+ .longValue();
+ }
break;
- case DOUBLE:
- if (jsonBQValue instanceof Double) {
- return jsonBQValue;
- } else if (jsonBQValue instanceof Float) {
- return Double.valueOf((Float) jsonBQValue);
+ case "DATE":
+ if (value instanceof String) {
+ return ((Long) LocalDate.parse((String)
value).toEpochDay()).intValue();
+ } else if (value instanceof LocalDate) {
+ return ((Long) ((LocalDate) value).toEpochDay()).intValue();
+ } else if (value instanceof org.joda.time.LocalDate) {
+ return Days.daysBetween(
+ org.joda.time.Instant.EPOCH.toDateTime().toLocalDate(),
+ (org.joda.time.LocalDate) value)
+ .getDays();
+ }
+ break;
+ case "NUMERIC":
+ case "BIGNUMERIC":
+ if (value instanceof String) {
+ return value;
+ } else if (value instanceof BigDecimal) {
+ return ((BigDecimal) value).toPlainString();
Review Comment:
We do provide encoding here:
https://github.com/googleapis/java-bigquerystorage/blob/main/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/BigDecimalByteStringEncoder.java
So that they can be pass in more efficiently. Not blocking this PR.
Issue Time Tracking
-------------------
Worklog Id: (was: 763931)
Remaining Estimate: 100.5h (was: 100h 40m)
Time Spent: 19.5h (was: 19h 20m)
> BigQueryIO cannot write to DATE and TIMESTAMP columns when using Storage
> Write API
> -----------------------------------------------------------------------------------
>
> Key: BEAM-13990
> URL: https://issues.apache.org/jira/browse/BEAM-13990
> Project: Beam
> Issue Type: Improvement
> Components: io-java-gcp
> Affects Versions: 2.36.0
> Reporter: Du Liu
> Assignee: Du Liu
> Priority: P2
> Original Estimate: 120h
> Time Spent: 19.5h
> Remaining Estimate: 100.5h
>
> when using Storage Write API with BigQueryIO, DATE and TIMESTAMP values are
> currently converted to String type in protobuf message. This is incorrect,
> according to storage write api [documentation|#data_type_conversions],] DATE
> should be converted to int32 and TIMESTAMP should be converted to int64.
> Here's error message:
> INFO: Stream finished with error
> com.google.api.gax.rpc.InvalidArgumentException:
> io.grpc.StatusRuntimeException: INVALID_ARGUMENT: The proto field mismatched
> with BigQuery field at D6cbe536b_4dab_4292_8fda_ff2932dded49.datevalue, the
> proto field type string, BigQuery field type DATE Entity
> I have included an integration test here:
> [https://github.com/liu-du/beam/commit/b56823d1d213adf6ca5564ce1d244cc4ae8f0816]
>
> The problem is because DATE and TIMESTAMP are converted to String in protobuf
> message here:
> [https://github.com/apache/beam/blob/a78fec72d0d9198eef75144a7bdaf93ada5abf9b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java#L69]
>
> Storage Write API reject the request because it's expecting int32/int64
> values.
>
> I've opened a PR here: https://github.com/apache/beam/pull/16926
--
This message was sent by Atlassian Jira
(v8.20.7#820007)