gemini-code-assist[bot] commented on code in PR #36425:
URL: https://github.com/apache/beam/pull/36425#discussion_r2453907497
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -221,6 +189,232 @@ private static String
getPrettyFieldName(SchemaInformation schema) {
.put(TableFieldSchema.Type.JSON, "JSON")
.build();
+ @FunctionalInterface
+ public interface ThrowingBiFunction<FirstInputT, SecondInputT, OutputT> {
+ OutputT apply(FirstInputT t, SecondInputT u) throws
SchemaConversionException;
+ }
+
+ static final DecimalFormat DECIMAL_FORMAT = new
DecimalFormat("0.0###############");
Review Comment:

The `DecimalFormat` constructor without a `Locale` is locale-sensitive. This
can cause issues when running in different environments, as the decimal
separator might be a comma instead of a period, which would not be correctly
interpreted by BigQuery. To ensure consistent behavior, it's best to specify a
locale that uses a period as the decimal separator, such as `Locale.ROOT`.
You will also need to add the following imports:
```java
import java.text.DecimalFormatSymbols;
import java.util.Locale;
```
```suggestion
static final DecimalFormat DECIMAL_FORMAT = new
DecimalFormat("0.0###############",
DecimalFormatSymbols.getInstance(Locale.ROOT));
```
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -1282,68 +1340,378 @@ private static long toEpochMicros(Instant timestamp) {
@VisibleForTesting
public static TableRow tableRowFromMessage(
- Message message, boolean includeCdcColumns, Predicate<String>
includeField) {
- return tableRowFromMessage(message, includeCdcColumns, includeField, "");
+ SchemaInformation schemaInformation,
+ Message message,
+ boolean includeCdcColumns,
+ Predicate<String> includeField) {
+ return tableRowFromMessage(schemaInformation, message, includeCdcColumns,
includeField, "");
}
public static TableRow tableRowFromMessage(
+ SchemaInformation schemaInformation,
+ Message message,
+ boolean includeCdcColumns,
+ Predicate<String> includeField,
+ String namePrefix) {
+ // We first try to create a map-style TableRow for backwards compatibility
with existing usage.
+ // However this will
+ // fail if there is a column name "f". If it fails, we then instead create
a list-style
+ // TableRow.
+ Optional<TableRow> tableRow =
+ tableRowFromMessageNoF(
+ schemaInformation, message, includeCdcColumns, includeField,
namePrefix);
+ return tableRow.orElseGet(
+ () ->
+ tableRowFromMessageUseSetF(
+ schemaInformation, message, includeCdcColumns, includeField,
""));
+ }
+
+ private static Optional<TableRow> tableRowFromMessageNoF(
+ SchemaInformation schemaInformation,
Message message,
boolean includeCdcColumns,
Predicate<String> includeField,
String namePrefix) {
- // TODO: Would be more correct to generate TableRows using setF.
TableRow tableRow = new TableRow();
for (Map.Entry<FieldDescriptor, Object> field :
message.getAllFields().entrySet()) {
StringBuilder fullName = new StringBuilder();
FieldDescriptor fieldDescriptor = field.getKey();
String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
+ if ("f".equals(fieldName)) {
+ // TableRow.put won't work as expected if the fields in named "f."
Fail the call, and force
+ // a retry using
+ // the setF codepath.
+ return Optional.empty();
+ }
fullName = fullName.append(namePrefix).append(fieldName);
Object fieldValue = field.getValue();
if ((includeCdcColumns ||
!StorageApiCDC.COLUMNS.contains(fullName.toString()))
&& includeField.test(fieldName)) {
- tableRow.put(
- fieldName,
+ SchemaInformation fieldSchemaInformation =
schemaInformation.getSchemaForField(fieldName);
+ Object convertedFieldValue =
jsonValueFromMessageValue(
- fieldDescriptor, fieldValue, true, includeField,
fullName.append(".").toString()));
+ fieldSchemaInformation,
+ fieldDescriptor,
+ fieldValue,
+ true,
+ includeField,
+ fullName.append(".").toString(),
+ false);
+ if (convertedFieldValue instanceof Optional) {
+ Optional<?> optional = (Optional<?>) convertedFieldValue;
+ if (!optional.isPresent()) {
+ // Some nested message had a field named "f." Fail.
+ return Optional.empty();
+ } else {
+ convertedFieldValue = optional.get();
+ }
+ }
+ tableRow.put(fieldName, convertedFieldValue);
+ }
+ }
+ return Optional.of(tableRow);
+ }
+
+ public static TableRow tableRowFromMessageUseSetF(
+ SchemaInformation schemaInformation,
+ Message message,
+ boolean includeCdcColumns,
+ Predicate<String> includeField,
+ String namePrefix) {
+ List<TableCell> tableCells =
+
Lists.newArrayListWithCapacity(message.getDescriptorForType().getFields().size());
+
+ for (FieldDescriptor fieldDescriptor :
message.getDescriptorForType().getFields()) {
+ TableCell tableCell = new TableCell();
+ boolean isPresent =
+ (fieldDescriptor.isRepeated() &&
message.getRepeatedFieldCount(fieldDescriptor) > 0)
+ || (!fieldDescriptor.isRepeated() &&
message.hasField(fieldDescriptor));
+ if (isPresent) {
+ StringBuilder fullName = new StringBuilder();
+ String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
+ fullName = fullName.append(namePrefix).append(fieldName);
+ if ((includeCdcColumns ||
!StorageApiCDC.COLUMNS.contains(fullName.toString()))
+ && includeField.test(fieldName)) {
+ SchemaInformation fieldSchemaInformation =
schemaInformation.getSchemaForField(fieldName);
+ Object fieldValue = message.getField(fieldDescriptor);
+ Object converted =
+ jsonValueFromMessageValue(
+ fieldSchemaInformation,
+ fieldDescriptor,
+ fieldValue,
+ true,
+ includeField,
+ fullName.append(".").toString(),
+ true);
+ tableCell.setV(converted);
+ }
}
+ tableCells.add(tableCell);
}
+
+ TableRow tableRow = new TableRow();
+ tableRow.setF(tableCells);
+
return tableRow;
}
+ // Our process for generating descriptors modifies the names of nested
descriptors for wrapper
+ // types, so we record them here.
+ private static final Set<String> FLOAT_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_FloatValue", "FloatValue");
+ private static final Set<String> DOUBLE_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_DoubleValue", "DoubleValue");
+ private static final Set<String> BOOL_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_BoolValue", "BoolValue");
+ private static final Set<String> INT32_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_Int32Value", "Int32Value");
+ private static final Set<String> INT64_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_Int64Value", "Int64Value");
+ private static final Set<String> UINT32_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_UInt32Value", "UInt32Value");
+ private static final Set<String> UINT64_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_UInt64Value", "UInt64Value");
+ private static final Set<String> BYTES_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_BytesValue", "BytesValue");
+ private static final Set<String> TIMESTAMP_VALUE_DESCRIPTOR_NAMES =
+ ImmutableSet.of("google_protobuf_Timestamp", "Timestamp");
+
+ // Translate a proto message value into a json value. If useSetF==false,
this will fail with
+ // Optional.empty() if
+ // any fields named "f" are found (due to restrictions on the TableRow
class). In that case, the
+ // top level will retry
+ // with useSetF==true. We fallback this way in order to maintain backwards
compatibility with
+ // existing users.
public static Object jsonValueFromMessageValue(
+ SchemaInformation schemaInformation,
FieldDescriptor fieldDescriptor,
Object fieldValue,
boolean expandRepeated,
Predicate<String> includeField,
- String prefix) {
+ String prefix,
+ boolean useSetF) {
if (expandRepeated && fieldDescriptor.isRepeated()) {
List<Object> valueList = (List<Object>) fieldValue;
- return valueList.stream()
- .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false,
includeField, prefix))
- .collect(toList());
+ List<Object> expanded = Lists.newArrayListWithCapacity(valueList.size());
+ for (Object value : valueList) {
+ Object translatedValue =
+ jsonValueFromMessageValue(
+ schemaInformation, fieldDescriptor, value, false,
includeField, prefix, useSetF);
+ if (!useSetF && translatedValue instanceof Optional) {
+ Optional<?> optional = (Optional<?>) translatedValue;
+ if (!optional.isPresent()) {
+ // A nested element contained an "f" column. Fail the call.
+ return Optional.empty();
+ }
+ translatedValue = optional.get();
+ }
+ expanded.add(translatedValue);
+ }
+ return expanded;
}
- switch (fieldDescriptor.getType()) {
- case GROUP:
- case MESSAGE:
- return tableRowFromMessage((Message) fieldValue, false, includeField,
prefix);
- case BYTES:
- return BaseEncoding.base64().encode(((ByteString)
fieldValue).toByteArray());
- case ENUM:
- throw new RuntimeException("Enumerations not supported");
- case INT32:
- case FLOAT:
- case BOOL:
+ // BigQueryIO supports direct proto writes - i.e. we allow the user to
pass in their own proto
+ // and skip our
+ // conversion layer, as long as the proto conforms to the types supported
by the BigQuery
+ // Storage Write API.
+ // For many schema types, the Storage Write API supports different proto
field types (often with
+ // different
+ // encodings), so the mapping of schema type -> proto type is one to many.
To read the data out
+ // of the proto,
+ // we need to examine both the schema type and the proto field type.
+ switch (schemaInformation.getType()) {
case DOUBLE:
+ switch (fieldDescriptor.getType()) {
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return
DECIMAL_FORMAT.format(Double.parseDouble(fieldValue.toString()));
+ case MESSAGE:
+ // Handle the various number wrapper types.
+ Message doubleMessage = (Message) fieldValue;
+ if
(FLOAT_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName()))
{
+ float floatValue =
+ (float)
+ doubleMessage.getField(
+
doubleMessage.getDescriptorForType().findFieldByName("value"));
+
+ return DECIMAL_FORMAT.format(floatValue);
+ } else if (DOUBLE_VALUE_DESCRIPTOR_NAMES.contains(
+ fieldDescriptor.getMessageType().getName())) {
+ double doubleValue =
+ (double)
+ doubleMessage.getField(
+
doubleMessage.getDescriptorForType().findFieldByName("value"));
+ return DECIMAL_FORMAT.format(doubleValue);
+ } else {
+ throw new RuntimeException(
+ "Not implemented yet " +
fieldDescriptor.getMessageType().getName());
+ }
+ default:
+ return fieldValue.toString();
+ }
+ case BOOL:
+ // Wrapper type.
+ if (fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
+ Message boolMessage = (Message) fieldValue;
+ if
(BOOL_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName()))
{
+ return boolMessage
+
.getField(boolMessage.getDescriptorForType().findFieldByName("value"))
+ .toString();
+ } else {
+ throw new RuntimeException(
+ "Not implemented yet " +
fieldDescriptor.getMessageType().getName());
+ }
+ }
+ return fieldValue.toString();
+ case JSON:
+ case GEOGRAPHY:
// The above types have native representations in JSON for all their
// possible values.
- return fieldValue;
case STRING:
+ return fieldValue.toString();
case INT64:
+ switch (fieldDescriptor.getType()) {
+ case MESSAGE:
+ // Wrapper types.
+ Message message = (Message) fieldValue;
+ if
(INT32_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName()))
{
+ return message
+
.getField(message.getDescriptorForType().findFieldByName("value"))
+ .toString();
+ } else if (INT64_VALUE_DESCRIPTOR_NAMES.contains(
+ fieldDescriptor.getMessageType().getName())) {
+ return message
+
.getField(message.getDescriptorForType().findFieldByName("value"))
+ .toString();
+ } else if (UINT32_VALUE_DESCRIPTOR_NAMES.contains(
+ fieldDescriptor.getMessageType().getName())) {
+ return message
+
.getField(message.getDescriptorForType().findFieldByName("value"))
+ .toString();
+ } else if (UINT64_VALUE_DESCRIPTOR_NAMES.contains(
+ fieldDescriptor.getMessageType().getName())) {
+ return message
+
.getField(message.getDescriptorForType().findFieldByName("value"))
+ .toString();
+ } else {
+ throw new RuntimeException(
+ "Not implemented yet " +
fieldDescriptor.getMessageType().getFullName());
+ }
+ default:
+ return fieldValue.toString();
+ }
+ case BYTES:
+ switch (fieldDescriptor.getType()) {
+ case BYTES:
+ return BaseEncoding.base64().encode(((ByteString)
fieldValue).toByteArray());
+ case STRING:
+ return BaseEncoding.base64()
+ .encode(((String)
fieldValue).getBytes(StandardCharsets.UTF_8));
+ case MESSAGE:
+ Message message = (Message) fieldValue;
+ if
(BYTES_VALUE_DESCRIPTOR_NAMES.contains(fieldDescriptor.getMessageType().getName()))
{
+ ByteString byteString =
+ (ByteString)
+
message.getField(message.getDescriptorForType().findFieldByName("value"));
+ return BaseEncoding.base64().encode(byteString.toByteArray());
+ }
+ throw new RuntimeException(
+ "Not implemented " +
fieldDescriptor.getMessageType().getFullName());
+ default:
+ return fieldValue.toString();
+ }
+ case TIMESTAMP:
+ if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
+ long epochMicros = Long.valueOf(fieldValue.toString());
+ long epochSeconds = epochMicros / 1_000_000L;
+ long nanoAdjustment = (epochMicros % 1_000_000L) * 1_000L;
+ Instant instant = Instant.ofEpochSecond(epochSeconds,
nanoAdjustment);
+ return LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
+ } else if
(fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
+ Message message = (Message) fieldValue;
+ if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
+ fieldDescriptor.getMessageType().getName())) {
+ Descriptor descriptor = message.getDescriptorForType();
+ long seconds = (long)
message.getField(descriptor.findFieldByName("seconds"));
+ int nanos = (int)
message.getField(descriptor.findFieldByName("nanos"));
+ Instant instant = Instant.ofEpochSecond(seconds, nanos);
+ return LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).format(TIMESTAMP_FORMATTER);
+ } else {
+ throw new RuntimeException(
+ "Not implemented yet " +
fieldDescriptor.getMessageType().getFullName());
+ }
+ } else {
+ return fieldValue.toString();
+ }
+
Review Comment:

The `TIMESTAMP_FORMATTER` from `BigQueryUtils` is designed for parsing
various timestamp formats, not for formatting them into a canonical string
representation. Using it for formatting can produce incorrect output (e.g.,
`2022-01-0110:00:00` without a 'T' separator) because `optionalStart()` has no
effect during formatting.
To ensure correct and consistent formatting, you should use a formatter
designed for output, such as `BigQueryUtils.BIGQUERY_DATETIME_FORMATTER`. This
would also align with how `DATETIME` fields are handled in this class.
You'll need to add the following import:
```java
import static
org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils.BIGQUERY_DATETIME_FORMATTER;
```
```java
case TIMESTAMP:
if (isProtoFieldTypeInteger(fieldDescriptor.getType())) {
long epochMicros = Long.valueOf(fieldValue.toString());
long epochSeconds = epochMicros / 1_000_000L;
long nanoAdjustment = (epochMicros % 1_000_000L) * 1_000L;
Instant instant = Instant.ofEpochSecond(epochSeconds,
nanoAdjustment);
return LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).format(BIGQUERY_DATETIME_FORMATTER);
} else if
(fieldDescriptor.getType().equals(FieldDescriptor.Type.MESSAGE)) {
Message message = (Message) fieldValue;
if (TIMESTAMP_VALUE_DESCRIPTOR_NAMES.contains(
fieldDescriptor.getMessageType().getName())) {
Descriptor descriptor = message.getDescriptorForType();
long seconds = (long)
message.getField(descriptor.findFieldByName("seconds"));
int nanos = (int)
message.getField(descriptor.findFieldByName("nanos"));
Instant instant = Instant.ofEpochSecond(seconds, nanos);
return LocalDateTime.ofInstant(instant,
ZoneOffset.UTC).format(BIGQUERY_DATETIME_FORMATTER);
} else {
throw new RuntimeException(
"Not implemented yet " +
fieldDescriptor.getMessageType().getFullName());
}
} else {
return fieldValue.toString();
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]