gemini-code-assist[bot] commented on code in PR #35794: URL: https://github.com/apache/beam/pull/35794#discussion_r2258154088
########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. + * + * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG = "output"; + + @Override + protected Class<MongoDbReadSchemaTransformConfiguration> configurationClass() { + return MongoDbReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + // Return a unique URN for the transform. + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List<String> inputCollectionNames() { + // A read transform does not have an input PCollection. + return Collections.emptyList(); + } + + @Override + public List<String> outputCollectionNames() { + // The primary output is a PCollection of Rows. + // Error handling could be added later with a second "errors" output tag. + return Collections.singletonList(OUTPUT_TAG); + } + + /** Configuration class for the MongoDB Read transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration + .Builder(); + } + + /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setFilter(String filter); + + public abstract MongoDbReadSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // A read transform does not have an input, so we start with the pipeline. + PCollection<Document> mongoDocs = + input + .getPipeline() + .apply( + "ReadFromMongoDb", + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection())); + // TODO: Add support for .withFilter() if it exists in your MongoDbIO, + // using configuration.getFilter(). + + // Convert the BSON Document objects into Beam Row objects. + PCollection<Row> beamRows = + mongoDocs.apply("ConvertToBeamRows", ParDo.of(new MongoDocumentToRowFn())); + + return PCollectionRowTuple.of(OUTPUT_TAG, beamRows); + } + } + + /** + * A {@link DoFn} to convert a MongoDB {@link Document} to a Beam {@link Row}. + * + * <p>This is a critical step to ensure data is in a schema-aware format. + */ + private static class MongoDocumentToRowFn extends DoFn<Document, Row> { + // TODO: Define the Beam Schema that corresponds to your MongoDB documents. + // This could be made dynamic based on an inferred schema or a user-provided schema. + // For this skeleton, we assume a static schema. + // public static final Schema OUTPUT_SCHEMA = Schema.builder()...build(); + + @ProcessElement + public void processElement(@Element Document doc, OutputReceiver<Row> out) { + // Here you will convert the BSON document to a Beam Row. + // This requires you to know the target schema. + + // Example pseudo-code: + // Row.Builder rowBuilder = Row.withSchema(OUTPUT_SCHEMA); + // for (Map.Entry<String, Object> entry : doc.entrySet()) { + // rowBuilder.addValue(entry.getValue()); + // } + // out.output(rowBuilder.build()); + + // For a robust implementation, you would handle data type conversions + // between BSON types and Beam schema types. + throw new UnsupportedOperationException( + "MongoDocumentToRowFn must be implemented to convert MongoDB Documents to Beam Rows."); + } Review Comment:  The `MongoDocumentToRowFn` is not implemented and throws an `UnsupportedOperationException`. This makes the entire `MongoDbReadSchemaTransformProvider` non-functional. The logic to convert a BSON `Document` to a Beam `Row` needs to be implemented. This will likely involve handling schema inference or requiring a user-provided schema. ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbReadSchemaTransformProvider.java: ########## @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for reading from MongoDB. + * + * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbReadSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbReadSchemaTransformProvider.MongoDbReadSchemaTransformConfiguration> { + + private static final String OUTPUT_TAG = "output"; + + @Override + protected Class<MongoDbReadSchemaTransformConfiguration> configurationClass() { + return MongoDbReadSchemaTransformConfiguration.class; + } + + @Override + protected SchemaTransform from(MongoDbReadSchemaTransformConfiguration configuration) { + return new MongoDbReadSchemaTransform(configuration); + } + + @Override + public String identifier() { + // Return a unique URN for the transform. + return "beam:schematransform:org.apache.beam:mongodb_read:v1"; + } + + @Override + public List<String> inputCollectionNames() { + // A read transform does not have an input PCollection. + return Collections.emptyList(); + } + + @Override + public List<String> outputCollectionNames() { + // The primary output is a PCollection of Rows. + // Error handling could be added later with a second "errors" output tag. + return Collections.singletonList(OUTPUT_TAG); + } + + /** Configuration class for the MongoDB Read transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbReadSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to read from.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to read from.") + public abstract String getCollection(); + + @SchemaFieldDescription( + "An optional BSON filter to apply to the read. This should be a valid JSON string.") + @Nullable + public abstract String getFilter(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbReadSchemaTransformProvider_MongoDbReadSchemaTransformConfiguration + .Builder(); + } + + /** Builder for the {@link MongoDbReadSchemaTransformConfiguration}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setFilter(String filter); + + public abstract MongoDbReadSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the read operation. */ + private static class MongoDbReadSchemaTransform extends SchemaTransform { + private final MongoDbReadSchemaTransformConfiguration configuration; + + MongoDbReadSchemaTransform(MongoDbReadSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + // A read transform does not have an input, so we start with the pipeline. + PCollection<Document> mongoDocs = + input + .getPipeline() + .apply( + "ReadFromMongoDb", + MongoDbIO.read() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection())); + // TODO: Add support for .withFilter() if it exists in your MongoDbIO, + // using configuration.getFilter(). Review Comment:  The `filter` from the configuration is not being used. The `MongoDbIO.read()` transform should be configured with the filter provided in `configuration.getFilter()`. You'll need to parse the filter string into a Bson `Document`. ```suggestion MongoDbIO.Read read = MongoDbIO.read() .withUri(configuration.getUri()) .withDatabase(configuration.getDatabase()) .withCollection(configuration.getCollection()); if (configuration.getFilter() != null && !configuration.getFilter().isEmpty()) { read = read.withQueryFn( FindQuery.create().withFilters(Document.parse(configuration.getFilter()))); } PCollection<Document> mongoDocs = input.getPipeline().apply("ReadFromMongoDb", read); ``` ########## sdks/python/apache_beam/yaml/standard_io.yaml: ########## @@ -397,3 +397,26 @@ 'WriteToBigTable': 'beam:schematransform:org.apache.beam:bigtable_write:v1' config: gradle_target: 'sdks:java:io:google-cloud-platform:expansion-service:shadowJar' + +#MongoDB +- type: renaming + transforms: + 'ReadFromMongoDB': 'ReadFromMongoDB' + 'WriteToMongoDB': 'WriteToMongoDB' + config: + mappings: + 'ReadFromMongoDB': + connection_uri: "connection_uri" + database: "database" + collection: "collection" + 'WriteToMongoDB': + connection_uri: "connection_uri" + database: "database" + collection: "collection" + underlying_provider: + type: beamJar + transforms: + 'ReadFromBigTable': 'beam:schematransform:org.apache.beam:mongodb_read:v1' + 'WriteToBigTable': 'beam:schematransform:org.apache.beam:mongodb_write:v1' Review Comment:  There appears to be a copy-paste error. The transform names `ReadFromBigTable` and `WriteToBigTable` are used for the MongoDB provider. They should be `ReadFromMongoDB` and `WriteToMongoDB` to match the transform names defined for this provider. ``` 'ReadFromMongoDB': 'beam:schematransform:org.apache.beam:mongodb_read:v1' 'WriteToMongoDB': 'beam:schematransform:org.apache.beam:mongodb_write:v1' ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB. + * + * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbWriteSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> { + + private static final String INPUT_TAG = "input"; + + @Override + protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) { + return new MongoDbWriteSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_write:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + /** Configuration class for the MongoDB Write transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to write to.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to write to.") + public abstract String getCollection(); + + // @SchemaFieldDescription("The number of documents to include in each batch write.") + // @Nullable + // public abstract Long getBatchSize(); + // + // @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.") + // @Nullable + // public abstract Boolean getOrdered(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration + .Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setBatchSize(Long batchSize); + + public abstract Builder setOrdered(Boolean ordered); + + public abstract MongoDbWriteSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the write operation. */ + private static class MongoDbWriteSchemaTransform extends SchemaTransform { + private final MongoDbWriteSchemaTransformConfiguration configuration; + + MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection<Row> rows = input.get(INPUT_TAG); + + PCollection<Document> documents = + rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn())); + + MongoDbIO.Write write = + MongoDbIO.write() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + // if (configuration.getBatchSize() != null) { + // write = write.withBatchSize(configuration.getBatchSize()); + // } + // if (configuration.getOrdered() != null) { + // write = write.withOrdered(configuration.getOrdered()); + // } Review Comment:  The `batchSize` and `ordered` configurations are not being applied to the `MongoDbIO.Write` transform. Please uncomment this block to enable this functionality, assuming the corresponding configuration properties are also uncommented. ```suggestion if (configuration.getBatchSize() != null) { write = write.withBatchSize(configuration.getBatchSize()); } if (configuration.getOrdered() != null) { write = write.withOrdered(configuration.getOrdered()); } ``` ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB. + * + * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbWriteSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> { + + private static final String INPUT_TAG = "input"; + + @Override + protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) { + return new MongoDbWriteSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_write:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + /** Configuration class for the MongoDB Write transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to write to.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to write to.") + public abstract String getCollection(); + + // @SchemaFieldDescription("The number of documents to include in each batch write.") + // @Nullable + // public abstract Long getBatchSize(); + // + // @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.") + // @Nullable + // public abstract Boolean getOrdered(); + + public void validate() { + checkArgument(getUri() != null && !getUri().isEmpty(), "MongoDB URI must be specified."); + checkArgument( + getDatabase() != null && !getDatabase().isEmpty(), "MongoDB database must be specified."); + checkArgument( + getCollection() != null && !getCollection().isEmpty(), + "MongoDB collection must be specified."); + } + + public static Builder builder() { + return new AutoValue_MongoDbWriteSchemaTransformProvider_MongoDbWriteSchemaTransformConfiguration + .Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setUri(String uri); + + public abstract Builder setDatabase(String database); + + public abstract Builder setCollection(String collection); + + public abstract Builder setBatchSize(Long batchSize); + + public abstract Builder setOrdered(Boolean ordered); + + public abstract MongoDbWriteSchemaTransformConfiguration build(); + } + } + + /** The {@link SchemaTransform} that performs the write operation. */ + private static class MongoDbWriteSchemaTransform extends SchemaTransform { + private final MongoDbWriteSchemaTransformConfiguration configuration; + + MongoDbWriteSchemaTransform(MongoDbWriteSchemaTransformConfiguration configuration) { + configuration.validate(); + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + PCollection<Row> rows = input.get(INPUT_TAG); + + PCollection<Document> documents = + rows.apply("ConvertToDocument", ParDo.of(new RowToBsonDocumentFn())); + + MongoDbIO.Write write = + MongoDbIO.write() + .withUri(configuration.getUri()) + .withDatabase(configuration.getDatabase()) + .withCollection(configuration.getCollection()); + + // if (configuration.getBatchSize() != null) { + // write = write.withBatchSize(configuration.getBatchSize()); + // } + // if (configuration.getOrdered() != null) { + // write = write.withOrdered(configuration.getOrdered()); + // } + + documents.apply("WriteToMongo", write); + + return PCollectionRowTuple.empty(input.getPipeline()); + } + } + + /** A {@link DoFn} to convert a Beam {@link Row} to a MongoDB {@link Document}. */ + private static class RowToMongoDocumentFn extends DoFn<Row, Document> { + @ProcessElement + public void processElement(@Element Row row, OutputReceiver<Document> out) { + Document doc = new Document(); + for (int i = 0; i < row.getSchema().getFieldCount(); i++) { + String fieldName = row.getSchema().getField(i).getName(); + Object value = row.getValue(i); + + if (value != null) { + doc.append(fieldName, value); + } + } + out.output(doc); + } + } Review Comment:  The class `RowToMongoDocumentFn` appears to be unused. The transform uses `RowToBsonDocumentFn`. Please remove the unused `RowToMongoDocumentFn` class to improve code clarity. ########## sdks/java/io/mongodb/src/main/java/org/apache/beam/sdk/io/mongodb/MongoDbWriteSchemaTransformProvider.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.mongodb; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.bson.Document; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for writing to MongoDB. + * + * <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@AutoService(SchemaTransformProvider.class) +public class MongoDbWriteSchemaTransformProvider + extends TypedSchemaTransformProvider< + MongoDbWriteSchemaTransformProvider.MongoDbWriteSchemaTransformConfiguration> { + + private static final String INPUT_TAG = "input"; + + @Override + protected SchemaTransform from(MongoDbWriteSchemaTransformConfiguration configuration) { + return new MongoDbWriteSchemaTransform(configuration); + } + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:mongodb_write:v1"; + } + + @Override + public List<String> inputCollectionNames() { + return Collections.singletonList(INPUT_TAG); + } + + /** Configuration class for the MongoDB Write transform. */ + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class MongoDbWriteSchemaTransformConfiguration implements Serializable { + + @SchemaFieldDescription("The connection URI for the MongoDB server.") + public abstract String getUri(); + + @SchemaFieldDescription("The MongoDB database to write to.") + public abstract String getDatabase(); + + @SchemaFieldDescription("The MongoDB collection to write to.") + public abstract String getCollection(); + + // @SchemaFieldDescription("The number of documents to include in each batch write.") + // @Nullable + // public abstract Long getBatchSize(); + // + // @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.") + // @Nullable + // public abstract Boolean getOrdered(); Review Comment:  The configuration properties `batchSize` and `ordered` are commented out, but their builder methods exist. This functionality should be enabled to allow users to configure these important write options. Please uncomment these properties and their usage in the `MongoDbWriteSchemaTransform.expand` method. ```suggestion @SchemaFieldDescription("The number of documents to include in each batch write.") @Nullable public abstract Long getBatchSize(); @SchemaFieldDescription("Whether the writes should be performed in an ordered manner.") @Nullable public abstract Boolean getOrdered(); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
