Just an addendum, you should be able to fake this in the meantime by
starting with an initial create and using state and timers. One
problem with the source as written above is that it will never
checkpoint, meaning if your pipeline crashes it will start again from
the beginning (including all the downstream processing). You could
possibly get a resume token from your cursor, store that to state, and
exit the DoFn. In your timer callback, you would resume reading for a
while and then set another timer, just as before. See
https://s.apache.org/beam-python-user-state-and-timers and related
docs for all the details.

Don't hesitate to respond to the thread if anything isn't clear or you
have additional questions (or success stories!).

- Robert

On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw <rober...@google.com> wrote:
>
> The future of Beam sources is SDF, see
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>
> This is actively being worked on, but we're still in the present. For
> bounded sources, you still may want to use the Source API (which, in
> Python, is much closer to what SDF is settling down to be, so it
> should be an easy port once that time comes). Unfortunately, Python
> Streaming doesn't yet support anything but PubSub.
> On Tue, Nov 13, 2018 at 12:24 PM David Gasquez <davidgasq...@gmail.com> wrote:
> >
> > Hey there,
> >
> > I've been exploring Apache Beam lately and I'm now working on my first 
> > production pipeline. The goal of this pipeline is to replicate a MongoDB 
> > Collection into Big Query. To do that I want to read MongoDB Oplog and use 
> > these events to update the table in Big Query (happy to expand more on this 
> > if needed).
> >
> > MongoDB Oplog is an unbounded source. I was wondering what are the best 
> > practices dealing with this kind of sources in Python. Currently, I'm using 
> > a custom beam.DoFn to read the Oplog inside a streaming pipeline. That 
> > said, I'm not sure how this will behave and how can be improved (the 
> > pipeline relies on a beam.Create([0]) first step that seems hacky to me).
> >
> > This are the key snippets of the code:
> >
> > ```
> > class OplogSourceDoFn(beam.DoFn):
> >     def __init__(self, uri, database, collection):
> >         super(OplogSourceDoFn, self).__init__()
> >         self.uri = uri
> >         self.database = database
> >         self.collection = collection
> >
> >     def client(self):
> >         self._client = pymongo.MongoClient(self.uri, 
> > readPreference="secondaryPreferred")
> >         return self._client
> >
> >     def process(self, element):
> >         client = self.client()
> >         self.db = client.get_database(self.database)
> >         self.col = self.db.get_collection(self.collection)
> >         self.cursor = self.col.watch(full_document="updateLookup")
> >
> >         with self.cursor as stream:
> >             for change in stream:
> >                 yield change
> >
> > pipeline = (
> >     p
> >     | 'dummy_create' >> beam.Create([0])
> >     | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION))
> >     | 'process' >> beam.Map(process)
> > )
> > ```
> >
> > My hunch is that there's a way to leverage the StreamingCreate PTransform 
> > to read MongoDB Oplog or any other external unbounded source. 
> > Alternatively, I've also seen a good example on how to create a 
> > BoundedSource. This might be similar for an unbounded one but I think the 
> > Beam Programming Guide discourages building sources using the Source API.
> >
> > I'd appreciate any input or feedback you might have about the code and 
> > approach I'm taking!
> >
> > Thanks,
> > David.

Reply via email to