NagisaVon commented on code in PR #22400:
URL: https://github.com/apache/beam/pull/22400#discussion_r983775832


##########
sdks/python/apache_beam/io/mongodbio.py:
##########
@@ -772,36 +789,46 @@ def display_data(self):
 
 
 class _MongoSink:
-  def __init__(self, uri=None, db=None, coll=None, extra_params=None):
+  def __init__(
+      self, uri=None, db=None, coll=None, extra_params=None, writeFn=None):
     if extra_params is None:
       extra_params = {}
     self.uri = uri
     self.db = db
     self.coll = coll
     self.spec = extra_params
     self.client = None
+    self.writeFn = writeFn
+    if writeFn is None:
+      self.writeFn = self._defaultWriteFn
+
+  @staticmethod
+  def _defaultWriteFn(client, db, coll, documents, logger):
+    """to gain control over the write process,
+    a user could for example, change ReplaceOne into UpdateOne
+    a user could implement their own WriteFn, using this as a template,
+    notice that the 'self' argument should be ommited
+    """
+    requests = []
+    for doc in documents:
+      request = ReplaceOne(
+          filter={"_id": doc.get("_id", None)}, replacement=doc, upsert=True)
+      requests.append(request)
+      resp = client[db][coll].bulk_write(requests)
+      # set logger to debug level to log the response
+      logger.debug(
+          "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
+          "nMatched:%d, Errors:%s" % (
+              resp.modified_count,
+              resp.upserted_count,
+              resp.matched_count,
+              resp.bulk_api_result.get("writeErrors"),
+          ))
 
   def write(self, documents):
     if self.client is None:
       self.client = MongoClient(host=self.uri, **self.spec)
-    requests = []
-    for doc in documents:
-      # match document based on _id field, if not found in current collection,
-      # insert new one, otherwise overwrite it.
-      requests.append(
-          ReplaceOne(
-              filter={"_id": doc.get("_id", None)},
-              replacement=doc,
-              upsert=True))
-    resp = self.client[self.db][self.coll].bulk_write(requests)
-    _LOGGER.debug(
-        "BulkWrite to MongoDB result in nModified:%d, nUpserted:%d, "
-        "nMatched:%d, Errors:%s" % (
-            resp.modified_count,
-            resp.upserted_count,
-            resp.matched_count,
-            resp.bulk_api_result.get("writeErrors"),
-        ))
+    self.writeFn(self.client, self.db, self.coll, documents, _LOGGER)

Review Comment:
   docs are at line 696, any suggestions to make it more clear?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to