gemini-code-assist[bot] commented on code in PR #36425:
URL: https://github.com/apache/beam/pull/36425#discussion_r2423096861
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowToStorageApiProto.java:
##########
@@ -1128,68 +1216,376 @@ private static long toEpochMicros(Instant timestamp) {
@VisibleForTesting
public static TableRow tableRowFromMessage(
- Message message, boolean includeCdcColumns, Predicate<String>
includeField) {
- return tableRowFromMessage(message, includeCdcColumns, includeField, "");
+ SchemaInformation schemaInformation,
+ Message message,
+ boolean includeCdcColumns,
+ Predicate<String> includeField) {
+ return tableRowFromMessage(schemaInformation, message, includeCdcColumns,
includeField, "");
}
public static TableRow tableRowFromMessage(
+ SchemaInformation schemaInformation,
+ Message message,
+ boolean includeCdcColumns,
+ Predicate<String> includeField,
+ String namePrefix) {
+ // We first try to create a map-style TableRow for backwards compatibility
with existing usage.
+ // However this will
+ // fail if there is a column name "f". If it fails, we then instead create
a list-style
+ // TableRow.
+ Optional<TableRow> tableRow =
+ tableRowFromMessageNoF(
+ schemaInformation, message, includeCdcColumns, includeField,
namePrefix);
+ return tableRow.orElseGet(
+ () ->
+ tableRowFromMessageUseSetF(
+ schemaInformation, message, includeCdcColumns, includeField,
""));
+ }
+
+ private static Optional<TableRow> tableRowFromMessageNoF(
+ SchemaInformation schemaInformation,
Message message,
boolean includeCdcColumns,
Predicate<String> includeField,
String namePrefix) {
- // TODO: Would be more correct to generate TableRows using setF.
TableRow tableRow = new TableRow();
for (Map.Entry<FieldDescriptor, Object> field :
message.getAllFields().entrySet()) {
StringBuilder fullName = new StringBuilder();
FieldDescriptor fieldDescriptor = field.getKey();
String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
+ if ("f".equals(fieldName)) {
+ // TableRow.put won't work as expected if the fields in named "f."
Fail the call, and force
+ // a retry using
+ // the setF codepath.
+ return Optional.empty();
+ }
fullName = fullName.append(namePrefix).append(fieldName);
Object fieldValue = field.getValue();
if ((includeCdcColumns ||
!StorageApiCDC.COLUMNS.contains(fullName.toString()))
&& includeField.test(fieldName)) {
- tableRow.put(
- fieldName,
+ SchemaInformation fieldSchemaInformation =
schemaInformation.getSchemaForField(fieldName);
+ Object convertedFieldValue =
jsonValueFromMessageValue(
- fieldDescriptor, fieldValue, true, includeField,
fullName.append(".").toString()));
+ fieldSchemaInformation,
+ fieldDescriptor,
+ fieldValue,
+ true,
+ includeField,
+ fullName.append(".").toString(),
+ false);
+ if (convertedFieldValue instanceof Optional) {
+ Optional<?> optional = (Optional<?>) convertedFieldValue;
+ if (!optional.isPresent()) {
+ // Some nested message had a field named "f." Fail.
+ return Optional.empty();
+ } else {
+ convertedFieldValue = optional.get();
+ }
+ }
+ tableRow.put(fieldName, convertedFieldValue);
+ }
+ }
+ return Optional.of(tableRow);
+ }
+
+ public static TableRow tableRowFromMessageUseSetF(
+ SchemaInformation schemaInformation,
+ Message message,
+ boolean includeCdcColumns,
+ Predicate<String> includeField,
+ String namePrefix) {
+ List<TableCell> tableCells =
+
Lists.newArrayListWithCapacity(message.getDescriptorForType().getFields().size());
+
+ for (FieldDescriptor fieldDescriptor :
message.getDescriptorForType().getFields()) {
+ TableCell tableCell = new TableCell();
+ boolean isPresent =
+ (fieldDescriptor.isRepeated() &&
message.getRepeatedFieldCount(fieldDescriptor) > 0)
+ || (!fieldDescriptor.isRepeated() &&
message.hasField(fieldDescriptor));
+ if (isPresent) {
+ StringBuilder fullName = new StringBuilder();
+ String fieldName = fieldNameFromProtoFieldDescriptor(fieldDescriptor);
+ fullName = fullName.append(namePrefix).append(fieldName);
+ if ((includeCdcColumns ||
!StorageApiCDC.COLUMNS.contains(fullName.toString()))
+ && includeField.test(fieldName)) {
+ SchemaInformation fieldSchemaInformation =
schemaInformation.getSchemaForField(fieldName);
+ Object fieldValue = message.getField(fieldDescriptor);
+ Object converted =
+ jsonValueFromMessageValue(
+ fieldSchemaInformation,
+ fieldDescriptor,
+ fieldValue,
+ true,
+ includeField,
+ fullName.append(".").toString(),
+ true);
+ tableCell.setV(converted);
+ }
}
+ tableCells.add(tableCell);
}
+
+ TableRow tableRow = new TableRow();
+ tableRow.setF(tableCells);
+
return tableRow;
}
+ // Our process for generating descriptors modifies the names of nested
descriptors for wrapper
+ // types, so we record them here.
+ private static String FLOAT_VALUE_DESCRIPTOR_NAME =
"google_protobuf_FloatValue";
+ private static String DOUBLE_VALUE_DESCRIPTOR_NAME =
"google_protobuf_DoubleValue";
+ private static String BOOL_VALUE_DESCRIPTOR_NAME =
"google_protobuf_BoolValue";
+ private static String INT32_VALUE_DESCRIPTOR_NAME =
"google_protobuf_Int32Value";
+ private static String INT64_VALUE_DESCRIPTOR_NAME =
"google_protobuf_Int64Value";
+ private static String UINT32_VALUE_DESCRIPTOR_NAME =
"google_protobuf_UInt32Value";
+ private static String UINT64_VALUE_DESCRIPTOR_NAME =
"google_protobuf_UInt64Value";
+ private static String BYTES_VALUE_DESCRIPTOR_NAME =
"google_protobuf_BytesValue";
+ private static String TIMESTAMP_VALUE_DESCRIPTOR_NAME =
"google_protobuf_Timestamp";
+
+ // Translate a proto message value into a json value. If useSetF==false,
this will fail with
+ // Optional.empty() if
+ // any fields named "f" are found (due to restrictions on the TableRow
class). In that case, the
+ // top level will retry
+ // with useSetF==true. We fallback this way in order to maintain backwards
compatibility with
+ // existing users.
public static Object jsonValueFromMessageValue(
+ SchemaInformation schemaInformation,
FieldDescriptor fieldDescriptor,
Object fieldValue,
boolean expandRepeated,
Predicate<String> includeField,
- String prefix) {
+ String prefix,
+ boolean useSetF) {
if (expandRepeated && fieldDescriptor.isRepeated()) {
List<Object> valueList = (List<Object>) fieldValue;
- return valueList.stream()
- .map(v -> jsonValueFromMessageValue(fieldDescriptor, v, false,
includeField, prefix))
- .collect(toList());
+ List<Object> expanded = Lists.newArrayListWithCapacity(valueList.size());
+ for (Object value : valueList) {
+ Object translatedValue =
+ jsonValueFromMessageValue(
+ schemaInformation, fieldDescriptor, value, false,
includeField, prefix, useSetF);
+ if (!useSetF && translatedValue instanceof Optional) {
+ Optional<?> optional = (Optional<?>) translatedValue;
+ if (!optional.isPresent()) {
+ // A nested element contained an "f" column. Fail the call.
+ return Optional.empty();
+ }
+ translatedValue = optional.get();
+ }
+ expanded.add(translatedValue);
+ }
+ return expanded;
}
- switch (fieldDescriptor.getType()) {
- case GROUP:
- case MESSAGE:
- return tableRowFromMessage((Message) fieldValue, false, includeField,
prefix);
- case BYTES:
- return BaseEncoding.base64().encode(((ByteString)
fieldValue).toByteArray());
- case ENUM:
- throw new RuntimeException("Enumerations not supported");
- case INT32:
- case FLOAT:
- case BOOL:
+ // BigQueryIO supports direct proto writes - i.e. we allow the user to
pass in their own proto
+ // and skip our
+ // conversion layer, as long as the proto conforms to the types supported
by the BigQuery
+ // Storage Write API.
+ // For many schema types, the Storage Write API supports different proto
field types (often with
+ // different
+ // encodings), so the mapping of schema type -> proto type is one to many.
To read the data out
+ // of the proto,
+ // we need to examine both the schema type and the proto field type.
+ switch (schemaInformation.getType()) {
case DOUBLE:
+ switch (fieldDescriptor.getType()) {
+ case FLOAT:
+ case DOUBLE:
+ case STRING:
+ return
BigDecimal.valueOf(Double.parseDouble(fieldValue.toString()))
+ .stripTrailingZeros()
+ .toString();
+ case MESSAGE:
+ // Handle the various number wrapper types.
+ Message doubleMessage = (Message) fieldValue;
+ if
(fieldDescriptor.getMessageType().getName().equals(FLOAT_VALUE_DESCRIPTOR_NAME))
{
+ float floatValue =
+ (float)
+ doubleMessage.getField(
+
doubleMessage.getDescriptorForType().findFieldByName("value"));
+
+ return new
BigDecimal(Float.toString(floatValue)).stripTrailingZeros().toString();
+ } else if (fieldDescriptor
+ .getMessageType()
+ .getName()
+ .equals(DOUBLE_VALUE_DESCRIPTOR_NAME)) {
+ double doubleValue =
+ (double)
+ doubleMessage.getField(
+
doubleMessage.getDescriptorForType().findFieldByName("value"));
+ return
BigDecimal.valueOf(doubleValue).stripTrailingZeros().toString();
+ } else {
+ throw new RuntimeException(
+ "Not implemented yet "
+ + fieldDescriptor.getMessageType().getName()
+ + " PARTIAL NAME "
+ + fieldDescriptor.getMessageType().getName()
+ + " FIELD NAME "
+ + prefix
+ + " CLASS TYPE "
+ + fieldValue.getClass());
+ }
Review Comment:

The exception message here seems to contain debugging information ("PARTIAL
NAME", "FIELD NAME", "CLASS TYPE"). It would be better to provide a cleaner,
more user-friendly message. Also, consider using
`UnsupportedOperationException` instead of a generic `RuntimeException` for
cases that are not implemented. This applies to similar `throw new
RuntimeException("Not implemented...")` statements in this method for other
types like `BOOL`, `INT64`, `BYTES`, and `TIMESTAMP`.
```java
throw new UnsupportedOperationException(
"Converting BigQuery 'DOUBLE' from a protobuf message of type '"
+ fieldDescriptor.getMessageType().getName()
+ "' is not supported. Field: "
+ prefix);
```
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -3663,13 +3675,11 @@ public void testStorageApiErrorsWriteTableRows() throws
Exception {
.via(BigQueryStorageApiInsertError::getRow));
PCollection<TableRow> successfulRows =
result.getSuccessfulStorageApiInserts();
- List<TableRow> expectedFailedRows =
-
badRows.stream().map(formatRecordOnFailureFunction::apply).collect(Collectors.toList());
+ List<TableRow> expectedFailedRows = Lists.newArrayList(badRows);
expectedFailedRows.addAll(
- goodRows.stream()
- .filter(shouldFailRow::apply)
- .map(formatRecordOnFailureFunction::apply)
- .collect(Collectors.toList()));
+
goodRows.stream().filter(shouldFailRow::apply).collect(Collectors.toList()));
+
+ deadRows.apply(ParDo.of(new PrintingDoFn()));
Review Comment:

This line applies a `PrintingDoFn` that prints failed rows to `System.err`.
This seems to be a debugging artifact left in the test. Test verification
should be done via assertions (like `PAssert`), and printing to standard error
should be avoided in tests as it can pollute logs. This line should be removed.
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOWriteTest.java:
##########
@@ -3684,6 +3694,13 @@ public void testStorageApiErrorsWriteTableRows() throws
Exception {
Iterables.filter(goodRows, r -> !shouldFailRow.apply(r)),
TableRow.class)));
}
+ private static class PrintingDoFn extends DoFn<TableRow, Void> {
+ @ProcessElement
+ public void processElement(@Element TableRow element) {
+ System.err.println("FAILED " + element);
+ }
+ }
Review Comment:

The `PrintingDoFn` class appears to be a debugging utility that was left in
the test code. It prints to `System.err`, which is not ideal for automated
tests. This class should be removed along with its usage.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]