gemini-code-assist[bot] commented on code in PR #38772:
URL: https://github.com/apache/beam/pull/38772#discussion_r3467765967


##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java:
##########
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import com.google.auto.service.AutoService;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
+import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.schemas.utils.JsonUtils;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.bson.Document;
+
+/** An implementation of {@link TypedSchemaTransformProvider} for reading from 
MongoDB. */
+@AutoService(SchemaTransformProvider.class)
+public class MongoDbReadSchemaTransformProvider
+    extends 
TypedSchemaTransformProvider<MongoDbReadSchemaTransformConfiguration> {
+
+  private static final String OUTPUT_TAG_NAME = "output";
+  public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
+  public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
+
+  private static final org.apache.beam.sdk.metrics.Counter errorCounter =
+      org.apache.beam.sdk.metrics.Metrics.counter(
+          MongoDbReadSchemaTransformProvider.class, 
"MongoDB-read-error-counter");
+
+  @Override
+  protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration 
configuration) {
+    return new MongoDbReadSchemaTransform(configuration);
+  }
+
+  @Override
+  public String identifier() {
+    return "beam:schematransform:org.apache.beam:mongodb_read:v1";
+  }
+
+  @Override
+  public List<String> inputCollectionNames() {
+    return Collections.emptyList();
+  }
+
+  @Override
+  public List<String> outputCollectionNames() {
+    return Collections.singletonList(OUTPUT_TAG_NAME);
+  }
+
+  /** The {@link SchemaTransform} that performs the read operation. */
+  private static class MongoDbReadSchemaTransform extends SchemaTransform {
+    private final MongoDbReadSchemaTransformConfiguration configuration;
+
+    MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration 
configuration) {
+      configuration.validate();
+      this.configuration = configuration;
+    }
+
+    @Override
+    public PCollectionRowTuple expand(PCollectionRowTuple input) {
+      Schema schema = 
JsonUtils.beamSchemaFromJsonSchema(configuration.getSchema());
+
+      MongoDbIO.Read read =
+          MongoDbIO.read()
+              .withUri(configuration.getUri())
+              .withDatabase(configuration.getDatabase())
+              .withCollection(configuration.getCollection());
+
+      final String filterStr = configuration.getFilter();
+      if (filterStr != null) {
+        read = 
read.withQueryFn(FindQuery.create().withFilters(Document.parse(filterStr)));
+      }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Using `Document.parse(filterStr)` returns an `org.bson.Document`, which is 
not serializable and does not match the `BsonDocument` type expected by 
`FindQuery.withFilters()`. This can lead to compilation errors or 
`NotSerializableException` at runtime when running on distributed runners. Use 
`BsonDocument.parse(filterStr)` instead.
   
   ```suggestion
         final String filterStr = configuration.getFilter();
         if (filterStr != null) {
           read = 
read.withQueryFn(FindQuery.create().withFilters(org.bson.BsonDocument.parse(filterStr)));
         }
   ```



##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java:
##########
@@ -71,4 +76,127 @@ public static Document toDocument(Row row) {
     }
     return value;
   }
+
+  /**
+   * Converts a BSON {@link Document} (or any Map representing fields) to a 
Beam {@link Row}
+   * matching the given {@link Schema}.
+   */
+  public static Row toRow(Map<?, ?> doc, Schema schema) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Field field : schema.getFields()) {
+      Object value = doc.get(field.getName());
+      rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
+    }
+    return rowBuilder.build();
+  }
+
+  @SuppressWarnings("JavaUtilDate")
+  private static @Nullable Object convertFromBsonValue(
+      @Nullable Object value, FieldType fieldType) {
+    if (value == null || value instanceof BsonNull) {
+      return null;
+    }
+
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+        return (value instanceof Number)
+            ? ((Number) value).byteValue()
+            : Byte.parseByte(value.toString());
+      case INT16:
+        return (value instanceof Number)
+            ? ((Number) value).shortValue()
+            : Short.parseShort(value.toString());
+      case INT32:
+        return (value instanceof Number)
+            ? ((Number) value).intValue()
+            : Integer.parseInt(value.toString());
+      case INT64:
+        return (value instanceof Number)
+            ? ((Number) value).longValue()
+            : Long.parseLong(value.toString());
+      case FLOAT:
+        return (value instanceof Number)
+            ? ((Number) value).floatValue()
+            : Float.parseFloat(value.toString());
+      case DOUBLE:
+        return (value instanceof Number)
+            ? ((Number) value).doubleValue()
+            : Double.parseDouble(value.toString());
+      case DECIMAL:
+        return (value instanceof Number)
+            ? java.math.BigDecimal.valueOf(((Number) value).doubleValue())
+            : new java.math.BigDecimal(value.toString());
+      case STRING:
+        return value.toString();
+      case BOOLEAN:
+        return (value instanceof Boolean)
+            ? (Boolean) value
+            : Boolean.parseBoolean(value.toString());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `Boolean.parseBoolean(value.toString())` for non-boolean types will 
return `false` for any string other than `"true"`. If MongoDB stores booleans 
as numbers (e.g., `1` for `true` and `0` for `false`), this will incorrectly 
convert `1` to `false`. It is safer to handle `Number` types explicitly.
   
   ```java
         case BOOLEAN:
           if (value instanceof Boolean) {
             return (Boolean) value;
           } else if (value instanceof Number) {
             return ((Number) value).intValue() != 0;
           } else {
             return Boolean.parseBoolean(value.toString());
           }
   ```



##########
sdks/java/io/mongodb/src/test/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProviderTest.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.SchemaRegistry;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionRowTuple;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.bson.Document;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MongoDbReadSchemaTransformProvider}. */
+@RunWith(JUnit4.class)
+public class MongoDbReadSchemaTransformProviderTest {
+
+  @Rule public transient TestPipeline p = TestPipeline.create();
+
+  @Test
+  public void testInvalidConfigMissingUri() {
+    assertThrows(
+        IllegalStateException.class,
+        () -> {
+          MongoDbReadSchemaTransformConfiguration.builder()
+              .setDatabase("db")
+              .setCollection("col")
+              .setSchema("{}")
+              .build()
+              .validate();
+        });
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This test (and the subsequent ones for missing database, collection, and 
schema) asserts that `IllegalStateException` is thrown. However, this exception 
is thrown by AutoValue's `build()` method because the required fields are null, 
meaning `validate()` is never actually called or tested. To test the 
`validate()` method's validation of empty strings, you should set empty strings 
on the builder and assert that `IllegalArgumentException` is thrown.
   
   ```suggestion
     @Test
     public void testInvalidConfigEmptyUri() {
       assertThrows(
           IllegalArgumentException.class,
           () -> {
             MongoDbReadSchemaTransformConfiguration.builder()
                 .setUri("")
                 .setDatabase("db")
                 .setCollection("col")
                 .setSchema("{}")
                 .build()
                 .validate();
           });
     }
   ```



##########
sdks/python/apache_beam/yaml/yaml_io.py:
##########
@@ -739,6 +741,63 @@ def write_to_tfrecord(
           compression_type=getattr(CompressionTypes, compression_type))
 
 
[email protected]_fn
+@yaml_errors.maybe_with_exception_handling_transform_fn
+def read_from_mongodb(
+    root,
+    *,
+    database: str,
+    collection: str,
+    schema: Union[str, dict[str, Any]],
+    connection_uri: Optional[str] = None,
+    filter: Optional[dict[str, Any]] = None,
+    projection: Optional[Union[list[str], dict[str, Any]]] = None,
+    extra_client_params: Optional[dict[str, Any]] = None,
+    bucket_auto: bool = False):
+  """Reads data from MongoDB.
+
+  The resulting PCollection consists of rows with fields matching the provided
+  schema.
+
+  Args:
+    database: The MongoDB database name.
+    collection: The MongoDB collection name.
+    schema: JSON schema specifying the fields to select and their types.
+    connection_uri: The MongoDB connection string. e.g. 
"mongodb://localhost:27017"
+    filter: A JSON/bson mapping specifying elements which must be present.
+    projection: A list of field names that should be returned or a dict
+      specifying the fields to include/exclude.
+    extra_client_params: Optional MongoClient parameters.
+    bucket_auto: If True, use MongoDB $bucketAuto aggregation to split
+      collection into bundles instead of splitVector command.
+  """
+  if isinstance(schema, str):
+    schema = json.loads(schema)
+  if isinstance(filter, str):
+    filter = json.loads(filter)
+  if isinstance(projection, str):
+    projection = json.loads(projection)
+
+  beam_schema = json_utils.json_schema_to_beam_schema(schema)
+  beam_type = schema_pb2.FieldType(
+      row_type=schema_pb2.RowType(schema=beam_schema))
+  to_row_fn = json_utils.json_to_row(beam_type)
+
+  output = (
+      root
+      | mongodbio.ReadFromMongoDB(
+          uri=connection_uri,
+          db=database,
+          coll=collection,
+          filter=filter,
+          projection=projection,
+          extra_client_params=extra_client_params,
+          bucket_auto=bucket_auto)
+      | beam.Map(to_row_fn))

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `mongodbio.ReadFromMongoDB` transform returns native BSON types (such as 
`bson.objectid.ObjectId` for `_id` and `datetime.datetime` for dates). Passing 
these directly to `json_utils.json_to_row` will result in `TypeError` or 
conversion failures if those fields are included in the schema, because 
`json_to_row` expects standard JSON types. Consider adding a normalization step 
to convert BSON-specific types (like `ObjectId` to `str`) before passing them 
to `to_row_fn`.



##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformConfiguration.java:
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.mongodb;
+
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
+
+import com.google.auto.value.AutoValue;
+import java.io.Serializable;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
+import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Configuration class for the MongoDB Read transform. */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class MongoDbReadSchemaTransformConfiguration implements 
Serializable {
+
+  @SchemaFieldDescription("The connection URI for the MongoDB server.")
+  public abstract String getUri();
+
+  @SchemaFieldDescription("The MongoDB database to read from.")
+  public abstract String getDatabase();
+
+  @SchemaFieldDescription("The MongoDB collection to read from.")
+  public abstract String getCollection();
+
+  @SchemaFieldDescription(
+      "The schema in which the data is encoded, defined with JSON-schema 
syntax (https://json-schema.org/).")
+  public abstract String getSchema();
+
+  @SchemaFieldDescription(
+      "An optional BSON filter to apply to the read. This should be a valid 
JSON string.")
+  @Nullable
+  public abstract String getFilter();
+
+  @SchemaFieldDescription(
+      "This option specifies whether and where to output rows that failed to 
be read.")
+  @Nullable
+  public abstract ErrorHandling getErrorHandling();
+
+  public void validate() {
+    checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must 
be specified.");
+    checkArgument(
+        getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database 
must be specified.");
+    checkArgument(
+        getCollection() != null && !getCollection().isEmpty(),
+        "MongoDB collection must be specified.");
+    checkArgument(
+        getSchema() != null && !getSchema().isEmpty(), "MongoDB schema must be 
specified.");
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Since `getUri()`, `getDatabase()`, `getCollection()`, and `getSchema()` are 
not annotated with `@Nullable`, AutoValue's generated `build()` method will 
automatically throw a `NullPointerException` or `IllegalStateException` if any 
of these fields are null. Therefore, the null checks in `validate()` are 
redundant and can be simplified to only check for empty strings.
   
   ```java
     public void validate() {
       checkArgument(!getUri().isEmpty(), "MongoDB URI must be specified.");
       checkArgument(!getDatabase().isEmpty(), "MongoDB database must be 
specified.");
       checkArgument(!getCollection().isEmpty(), "MongoDB collection must be 
specified.");
       checkArgument(!getSchema().isEmpty(), "MongoDB schema must be 
specified.");
     }
   ```



##########
sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbUtils.java:
##########
@@ -71,4 +76,127 @@ public static Document toDocument(Row row) {
     }
     return value;
   }
+
+  /**
+   * Converts a BSON {@link Document} (or any Map representing fields) to a 
Beam {@link Row}
+   * matching the given {@link Schema}.
+   */
+  public static Row toRow(Map<?, ?> doc, Schema schema) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Field field : schema.getFields()) {
+      Object value = doc.get(field.getName());
+      rowBuilder.addValue(convertFromBsonValue(value, field.getType()));
+    }
+    return rowBuilder.build();
+  }
+
+  @SuppressWarnings("JavaUtilDate")
+  private static @Nullable Object convertFromBsonValue(
+      @Nullable Object value, FieldType fieldType) {
+    if (value == null || value instanceof BsonNull) {
+      return null;
+    }
+
+    switch (fieldType.getTypeName()) {
+      case BYTE:
+        return (value instanceof Number)
+            ? ((Number) value).byteValue()
+            : Byte.parseByte(value.toString());
+      case INT16:
+        return (value instanceof Number)
+            ? ((Number) value).shortValue()
+            : Short.parseShort(value.toString());
+      case INT32:
+        return (value instanceof Number)
+            ? ((Number) value).intValue()
+            : Integer.parseInt(value.toString());
+      case INT64:
+        return (value instanceof Number)
+            ? ((Number) value).longValue()
+            : Long.parseLong(value.toString());
+      case FLOAT:
+        return (value instanceof Number)
+            ? ((Number) value).floatValue()
+            : Float.parseFloat(value.toString());
+      case DOUBLE:
+        return (value instanceof Number)
+            ? ((Number) value).doubleValue()
+            : Double.parseDouble(value.toString());
+      case DECIMAL:
+        return (value instanceof Number)
+            ? java.math.BigDecimal.valueOf(((Number) value).doubleValue())
+            : new java.math.BigDecimal(value.toString());
+      case STRING:
+        return value.toString();
+      case BOOLEAN:
+        return (value instanceof Boolean)
+            ? (Boolean) value
+            : Boolean.parseBoolean(value.toString());
+      case DATETIME:
+        if (value instanceof java.util.Date) {
+          return new Instant(((java.util.Date) value).getTime());
+        } else if (value instanceof Number) {
+          return new Instant(((Number) value).longValue());
+        } else {
+          return Instant.parse(value.toString());
+        }
+      case BYTES:
+        if (value instanceof Binary) {
+          return ((Binary) value).getData();
+        } else if (value instanceof byte[]) {
+          return (byte[]) value;
+        } else {
+          return 
value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
+        }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   When converting an `ObjectId` (such as the default `_id` field in MongoDB) 
to `BYTES`, calling `value.toString().getBytes(...)` will return the 24-byte 
UTF-8 representation of the hex string instead of the actual 12-byte binary 
representation. It is highly recommended to handle `org.bson.types.ObjectId` 
explicitly by calling `toByteArray()`.
   
   ```suggestion
         case BYTES:
           if (value instanceof Binary) {
             return ((Binary) value).getData();
           } else if (value instanceof byte[]) {
             return (byte[]) value;
           } else if (value instanceof org.bson.types.ObjectId) {
             return ((org.bson.types.ObjectId) value).toByteArray();
           } else {
             return 
value.toString().getBytes(java.nio.charset.StandardCharsets.UTF_8);
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to