Re: Multiple concurrent transforms with SparkRunner

2018-11-13 Thread Juan Carlos Garcia
I suggest to play around with some spark configurations like: dynamic
execution parameters

https://spark.apache.org/docs/latest/configuration.html#dynamic-allocation



Am Mi., 14. Nov. 2018, 02:28 hat Jiayue Zhang (Bravo) 
geschrieben:

> Hi,
>
> I'm writing Java Beam code to run with both Dataflow and Spark. The input
> files are tfrecord format and are from multiple directories. Java
> TFRecordIO doesn't have readAll from list of files so what I'm doing is:
>
> for (String dir: listOfDirs) {
> p.apply(TFRecordIO.read().from(dir))
>  .apply(ParDo.of(new BatchElements()))
>  .apply(ParDo.of(new Process()))
>  .apply(Combine.globally(new CombineResult()))
>  .apply(TextIO.write().to(dir))
> }
>
> These directories are fairly independent and I only need result of each
> directory. When running on Dataflow, processing of these directories happen
> concurrently. But when running with Spark, I saw the spark jobs and stages
> are sequential. It needs finish all steps in one directory before moving to
> next one. What's the way to make multiple transforms run concurrently with
> SparkRunner?
>


Multiple concurrent transforms with SparkRunner

2018-11-13 Thread Jiayue Zhang (Bravo)
Hi,

I'm writing Java Beam code to run with both Dataflow and Spark. The input
files are tfrecord format and are from multiple directories. Java
TFRecordIO doesn't have readAll from list of files so what I'm doing is:

for (String dir: listOfDirs) {
p.apply(TFRecordIO.read().from(dir))
 .apply(ParDo.of(new BatchElements()))
 .apply(ParDo.of(new Process()))
 .apply(Combine.globally(new CombineResult()))
 .apply(TextIO.write().to(dir))
}

These directories are fairly independent and I only need result of each
directory. When running on Dataflow, processing of these directories happen
concurrently. But when running with Spark, I saw the spark jobs and stages
are sequential. It needs finish all steps in one directory before moving to
next one. What's the way to make multiple transforms run concurrently with
SparkRunner?


Re: [Call for items] November Beam Newsletter

2018-11-13 Thread Rui Wang
Hi,

I just added some thing related to BeamSQL.

-Rui

On Tue, Nov 13, 2018 at 3:26 AM Etienne Chauchot 
wrote:

> Hi,
> I just added some things that were done.
>
> Etienne
>
> Le lundi 12 novembre 2018 à 12:22 +, Matthias Baetens a écrit :
>
> Looks great, thanks for the effort and for including the Summit blogpost,
> Rose!
>
> On Thu, 8 Nov 2018 at 22:55 Rose Nguyen  wrote:
>
> Hi Beamers:
>
> Time to sync with the community on all the awesome stuff we've been doing!
>
> *Add the highlights from October to now (or planned events and talks) that
> you want to share by 11/14 11:59 p.m. PDT.*
>
> We will collect the notes via Google docs but send out the final version
> directly to the user mailing list. If you do not know how to format
> something, it is OK to just put down the info and I will edit. I'll ship
> out the newsletter on 11/15.
>
> [1]
> https://docs.google.com/document/d/1kKQ4a9RdptB6NwYlqmI9tTcdLAUzDnWi2dkvUi0J_Ww
> --
> Rose Thị Nguyễn
>
> --
>
>
>


Re: ValueState for Dataflow runner and MapState for others

2018-11-13 Thread Lukasz Cwik
Depends on how complex the DoFn is, but you should be able to share parts
of the implementation in a static method that both implementations invoke.

On Mon, Nov 12, 2018 at 4:59 PM Dmitry Minaev  wrote:

> Yes, sure, that'll work, I will just have to support 2 different
> implementations. I was hoping there is something more elegant.
> Thank you Lukasz, I appreciate the response!
>
> --
> Dmitry
>
> On Mon, Nov 12, 2018 at 2:02 PM Lukasz Cwik  wrote:
>
>> Could you write two different implementations of the DoFn and put your
>> processing logic in another function that both DoFn's would invoke after
>> doing any accessing of the state?
>>
>> Then during pipeline construction you could choose to apply the Map one
>> or the Value one based upon which runner your using.
>>
>>
>>
>> On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev  wrote:
>>
>>> Hi everyone,
>>>
>>> Since Dataflow doesn't support MapState (
>>> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
>>> ValueState with a Map<> inside it. Is it a good idea? Here is an example
>>> code:
>>> ```
>>> @StateId("myValueStore")
>>> private final StateSpec>> valueStoreSpec
>>> = StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>>>
>>> @ProcessElement
>>> public void processElement( ProcessContext
>>> context, @StateId("myValueStore") MapState valueStore) {
>>> ...
>>> }
>>> ```
>>>
>>> I'd like to support other runners as well (e.g. FlinkRunner) and it
>>> seems to be more efficient to use MapState in cases where I need to store
>>> a map of values. So I'm thinking of the way to use MapState and ValueState
>>> for different runners.
>>>
>>> I understand how to get a runner in runtime via pipeline options but I'm
>>> not sure how to approach defining (and using) different StateSpec for
>>> different runners.
>>>
>>> Here is a sample code for MapState:
>>> ```
>>> @StateId("myMapStore")
>>> private final StateSpec> mapStoreSpec =
>>> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>>>
>>> @ProcessElement
>>> public void processElement( ProcessContext context,
>>> @StateId("myMapStore") MapState mapStore) {
>>> ...
>>> }
>>> ```
>>>
>>> Any ideas?
>>>
>>> Thank you,
>>> Dmitry
>>>
>>


Inserting datastore projected queries in Postgresql

2018-11-13 Thread Jonathan Perron

Hello fellow Apache Beam users,

I am trying to copy datastore entities to a PostgreSQL instance. As I 
don't need each fields, I performed projections following [this 
snippet][1]. I build the following query:


    public static Query DatastoreQuery() {
    Query.Builder query = Query.newBuilder();

    // Add filter
    query.addKindBuilder().setName("FOO");
    query.setFilter(makeFilter("bar", 
PropertyFilter.Operator.EQUAL, makeValue("fuz")));


    // Add projections
query.addProjection(Projection.newBuilder().setProperty(PropertyReference.newBuilder().setName("createdAt")));

    return query.build();
    }

This query is then used in the pipeline:

pipeline.apply(DatastoreIO.v1().read().withProjectId(options.getProjectId())
    .withQuery(ExtractDatastore.DatastoreQuery()));

Following 
https://stackoverflow.com/questions/44987493/unable-to-addprojection-to-relation-field-in-envers-query, 
I am expecting to get a `Map`. I would like to follow 
[Apache Beam documentation][2] to insert the entities in PostgreSQL, 
using a code similar to:


    .apply(JdbcIO.>write()
.withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(
    "com.google.cloud.sql.postgres",
    jdbcUrl
    )
    .withUsername(username)
    .withPassword(password))
    .withQuery("")
    .withPreparedStatementSetter());

As I have found no examples, here are my questions are the following:

1) How to handle the elements from the query ? In the example of my 
projection, how do I get access to `createdAt` which is a timestamp ? In 
python, I did it with : `value.timestamp_value.ToDatetime()`. Is there 
an equivalent in Java ?


2) Are the entities really under the format `>` as 
described in the SO issue ?


3) Can one apply JdbcIO.write() to `Map` or does it 
require to be under the `>` format as described 
in the small snippet of the documentation ?


I am grateful to take any inputs on this topic.

Best regards,

Jonathan

[1]: 
https://github.com/googleapis/google-cloud-java/blob/master/google-cloud-examples/src/main/java/com/google/cl%60enter%20code%20here%60oud/examples/datastore/snippets/QuerySnippets.java#L111

[2]: https://beam.apache.org/releases/javadoc/2.8.0/



Re: Reading from custom Unbounded Sources with the Python SDK

2018-11-13 Thread Robert Bradshaw
On Tue, Nov 13, 2018 at 3:44 PM David Gasquez 
wrote:
>
> Thanks so much for the help here Robert. If I understood it correctly, to
work with an unbounded source in Python right now the best approach is to
fake it using an initial create, state and timers.

Yep, exactly.

> If that's the case, do you have any code samples I can learn from? Not
sure how that looks to be honest.

I can't think of any examples offhand, but it's look something like


class OplogSourceDoFn(beam.DoFn):

resume_token_spec = beam.DoFn.BagStateSpec('resume',
beam.coders.BytesCoder())
timer_spec = beam.DoFn.TimerSpec('continue', TimeDomain.EVENT_TIME)

def __init__(self, uri, database, collection, batch_size=100):
super(OplogSourceDoFn, self).__init__()
self.uri = uri
self.database = database
self.collection = collection
self.batch_size = batch_size

def client(self):
self._client = pymongo.MongoClient(self.uri,
readPreference="secondaryPreferred")
return self._client

def process(self, element, timer=DoFn.TimerParam(timer_spec),
now=DoFn.TimestampParam):
# Set a timer to get things going.  This will fire soon.
timer.set(beam.Timestamp.of(time.time()))

@on_timer(timer_spec)
def resume(self, timer=DoFn.TimerParam(timer_spec),
resume_tokens=DoFn.StateParam(resume_token_spec)):

# Get the latest mongodb resume token, if any.
last_token = any(resume_tokens.read()) or None

# Open the stream.
client = self.client()
self.db = client.get_database(self.database)
self.col = self.db.get_collection(self.collection)
self.cursor =
self.col.watch(full_document="updateLookup", resume_after=last_token)

# Read at most batch_size elements from the stream.
change = None
with self.cursor as stream:
for _, change in zip(range(self.batch_size, stream)):
yield change

# If anything was read, set the last one as our resume token.
if change:
  resume_tokens.clear()
  resume_tokens.add(change.get('_id'))

# Schedule this method to run again.
timer.set(beam.Timestamp.of(time.time()))


Warning, this is email-window authored, completely untested code, so YYMV.
But hopefully it serves as an example of the core concepts.

> Finally, I'm thinking it might be simpler for me to use the provided
PubSub streaming source. That'd mean using an external service to place the
events in PubSub. It should also take care of checkpointing and handling
errors!

If you have somewhere you can run a process that reads from mongo and
publishes to pubsub, that'd do too.

> On Tue, 13 Nov 2018 at 14:33, Robert Bradshaw  wrote:
>>
>> Just an addendum, you should be able to fake this in the meantime by
>> starting with an initial create and using state and timers. One
>> problem with the source as written above is that it will never
>> checkpoint, meaning if your pipeline crashes it will start again from
>> the beginning (including all the downstream processing). You could
>> possibly get a resume token from your cursor, store that to state, and
>> exit the DoFn. In your timer callback, you would resume reading for a
>> while and then set another timer, just as before. See
>> https://s.apache.org/beam-python-user-state-and-timers and related
>> docs for all the details.
>>
>> Don't hesitate to respond to the thread if anything isn't clear or you
>> have additional questions (or success stories!).
>>
>> - Robert
>>
>> On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw 
wrote:
>> >
>> > The future of Beam sources is SDF, see
>> > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>> >
>> > This is actively being worked on, but we're still in the present. For
>> > bounded sources, you still may want to use the Source API (which, in
>> > Python, is much closer to what SDF is settling down to be, so it
>> > should be an easy port once that time comes). Unfortunately, Python
>> > Streaming doesn't yet support anything but PubSub.
>> > On Tue, Nov 13, 2018 at 12:24 PM David Gasquez 
wrote:
>> > >
>> > > Hey there,
>> > >
>> > > I've been exploring Apache Beam lately and I'm now working on my
first production pipeline. The goal of this pipeline is to replicate a
MongoDB Collection into Big Query. To do that I want to read MongoDB Oplog
and use these events to update the table in Big Query (happy to expand more
on this if needed).
>> > >
>> > > MongoDB Oplog is an unbounded source. I was wondering what are the
best practices dealing with this kind of sources in Python. Currently, I'm
using a custom beam.DoFn to read the Oplog inside a streaming pipeline.
That said, I'm not sure how this will behave and how can be improved (the
pipeline relies on a beam.Create([0]) first step that seems hacky to me).
>> > >
>> > > This are the key snippets of the code:
>> > >
>> > > ```
>> > > class OplogSourceDoFn(beam.DoFn):
>> > > def __init__

Re: Reading from custom Unbounded Sources with the Python SDK

2018-11-13 Thread David Gasquez
Thanks so much for the help here Robert. If I understood it correctly, to
work with an unbounded source in Python right now the best approach is to
fake it using an *initial create, state and timers*. If that's the case, do
you have any code samples I can learn from? Not sure how that looks to be
honest.

Finally, I'm thinking it might be simpler for me to use the provided PubSub
streaming source. That'd mean using an external service to place the events
in PubSub. It should also take care of checkpointing and handling errors!



On Tue, 13 Nov 2018 at 14:33, Robert Bradshaw  wrote:

> Just an addendum, you should be able to fake this in the meantime by
> starting with an initial create and using state and timers. One
> problem with the source as written above is that it will never
> checkpoint, meaning if your pipeline crashes it will start again from
> the beginning (including all the downstream processing). You could
> possibly get a resume token from your cursor, store that to state, and
> exit the DoFn. In your timer callback, you would resume reading for a
> while and then set another timer, just as before. See
> https://s.apache.org/beam-python-user-state-and-timers and related
> docs for all the details.
>
> Don't hesitate to respond to the thread if anything isn't clear or you
> have additional questions (or success stories!).
>
> - Robert
>
> On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw 
> wrote:
> >
> > The future of Beam sources is SDF, see
> > https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
> >
> > This is actively being worked on, but we're still in the present. For
> > bounded sources, you still may want to use the Source API (which, in
> > Python, is much closer to what SDF is settling down to be, so it
> > should be an easy port once that time comes). Unfortunately, Python
> > Streaming doesn't yet support anything but PubSub.
> > On Tue, Nov 13, 2018 at 12:24 PM David Gasquez 
> wrote:
> > >
> > > Hey there,
> > >
> > > I've been exploring Apache Beam lately and I'm now working on my first
> production pipeline. The goal of this pipeline is to replicate a MongoDB
> Collection into Big Query. To do that I want to read MongoDB Oplog and use
> these events to update the table in Big Query (happy to expand more on this
> if needed).
> > >
> > > MongoDB Oplog is an unbounded source. I was wondering what are the
> best practices dealing with this kind of sources in Python. Currently, I'm
> using a custom beam.DoFn to read the Oplog inside a streaming pipeline.
> That said, I'm not sure how this will behave and how can be improved (the
> pipeline relies on a beam.Create([0]) first step that seems hacky to me).
> > >
> > > This are the key snippets of the code:
> > >
> > > ```
> > > class OplogSourceDoFn(beam.DoFn):
> > > def __init__(self, uri, database, collection):
> > > super(OplogSourceDoFn, self).__init__()
> > > self.uri = uri
> > > self.database = database
> > > self.collection = collection
> > >
> > > def client(self):
> > > self._client = pymongo.MongoClient(self.uri,
> readPreference="secondaryPreferred")
> > > return self._client
> > >
> > > def process(self, element):
> > > client = self.client()
> > > self.db = client.get_database(self.database)
> > > self.col = self.db.get_collection(self.collection)
> > > self.cursor = self.col.watch(full_document="updateLookup")
> > >
> > > with self.cursor as stream:
> > > for change in stream:
> > > yield change
> > >
> > > pipeline = (
> > > p
> > > | 'dummy_create' >> beam.Create([0])
> > > | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE,
> COLLECTION))
> > > | 'process' >> beam.Map(process)
> > > )
> > > ```
> > >
> > > My hunch is that there's a way to leverage the StreamingCreate
> PTransform to read MongoDB Oplog or any other external unbounded source.
> Alternatively, I've also seen a good example on how to create a
> BoundedSource. This might be similar for an unbounded one but I think the
> Beam Programming Guide discourages building sources using the Source API.
> > >
> > > I'd appreciate any input or feedback you might have about the code and
> approach I'm taking!
> > >
> > > Thanks,
> > > David.
>


Re: Reading from custom Unbounded Sources with the Python SDK

2018-11-13 Thread Robert Bradshaw
Just an addendum, you should be able to fake this in the meantime by
starting with an initial create and using state and timers. One
problem with the source as written above is that it will never
checkpoint, meaning if your pipeline crashes it will start again from
the beginning (including all the downstream processing). You could
possibly get a resume token from your cursor, store that to state, and
exit the DoFn. In your timer callback, you would resume reading for a
while and then set another timer, just as before. See
https://s.apache.org/beam-python-user-state-and-timers and related
docs for all the details.

Don't hesitate to respond to the thread if anything isn't clear or you
have additional questions (or success stories!).

- Robert

On Tue, Nov 13, 2018 at 2:25 PM Robert Bradshaw  wrote:
>
> The future of Beam sources is SDF, see
> https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html
>
> This is actively being worked on, but we're still in the present. For
> bounded sources, you still may want to use the Source API (which, in
> Python, is much closer to what SDF is settling down to be, so it
> should be an easy port once that time comes). Unfortunately, Python
> Streaming doesn't yet support anything but PubSub.
> On Tue, Nov 13, 2018 at 12:24 PM David Gasquez  wrote:
> >
> > Hey there,
> >
> > I've been exploring Apache Beam lately and I'm now working on my first 
> > production pipeline. The goal of this pipeline is to replicate a MongoDB 
> > Collection into Big Query. To do that I want to read MongoDB Oplog and use 
> > these events to update the table in Big Query (happy to expand more on this 
> > if needed).
> >
> > MongoDB Oplog is an unbounded source. I was wondering what are the best 
> > practices dealing with this kind of sources in Python. Currently, I'm using 
> > a custom beam.DoFn to read the Oplog inside a streaming pipeline. That 
> > said, I'm not sure how this will behave and how can be improved (the 
> > pipeline relies on a beam.Create([0]) first step that seems hacky to me).
> >
> > This are the key snippets of the code:
> >
> > ```
> > class OplogSourceDoFn(beam.DoFn):
> > def __init__(self, uri, database, collection):
> > super(OplogSourceDoFn, self).__init__()
> > self.uri = uri
> > self.database = database
> > self.collection = collection
> >
> > def client(self):
> > self._client = pymongo.MongoClient(self.uri, 
> > readPreference="secondaryPreferred")
> > return self._client
> >
> > def process(self, element):
> > client = self.client()
> > self.db = client.get_database(self.database)
> > self.col = self.db.get_collection(self.collection)
> > self.cursor = self.col.watch(full_document="updateLookup")
> >
> > with self.cursor as stream:
> > for change in stream:
> > yield change
> >
> > pipeline = (
> > p
> > | 'dummy_create' >> beam.Create([0])
> > | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION))
> > | 'process' >> beam.Map(process)
> > )
> > ```
> >
> > My hunch is that there's a way to leverage the StreamingCreate PTransform 
> > to read MongoDB Oplog or any other external unbounded source. 
> > Alternatively, I've also seen a good example on how to create a 
> > BoundedSource. This might be similar for an unbounded one but I think the 
> > Beam Programming Guide discourages building sources using the Source API.
> >
> > I'd appreciate any input or feedback you might have about the code and 
> > approach I'm taking!
> >
> > Thanks,
> > David.


Re: Reading from custom Unbounded Sources with the Python SDK

2018-11-13 Thread Robert Bradshaw
The future of Beam sources is SDF, see
https://beam.apache.org/blog/2017/08/16/splittable-do-fn.html

This is actively being worked on, but we're still in the present. For
bounded sources, you still may want to use the Source API (which, in
Python, is much closer to what SDF is settling down to be, so it
should be an easy port once that time comes). Unfortunately, Python
Streaming doesn't yet support anything but PubSub.
On Tue, Nov 13, 2018 at 12:24 PM David Gasquez  wrote:
>
> Hey there,
>
> I've been exploring Apache Beam lately and I'm now working on my first 
> production pipeline. The goal of this pipeline is to replicate a MongoDB 
> Collection into Big Query. To do that I want to read MongoDB Oplog and use 
> these events to update the table in Big Query (happy to expand more on this 
> if needed).
>
> MongoDB Oplog is an unbounded source. I was wondering what are the best 
> practices dealing with this kind of sources in Python. Currently, I'm using a 
> custom beam.DoFn to read the Oplog inside a streaming pipeline. That said, 
> I'm not sure how this will behave and how can be improved (the pipeline 
> relies on a beam.Create([0]) first step that seems hacky to me).
>
> This are the key snippets of the code:
>
> ```
> class OplogSourceDoFn(beam.DoFn):
> def __init__(self, uri, database, collection):
> super(OplogSourceDoFn, self).__init__()
> self.uri = uri
> self.database = database
> self.collection = collection
>
> def client(self):
> self._client = pymongo.MongoClient(self.uri, 
> readPreference="secondaryPreferred")
> return self._client
>
> def process(self, element):
> client = self.client()
> self.db = client.get_database(self.database)
> self.col = self.db.get_collection(self.collection)
> self.cursor = self.col.watch(full_document="updateLookup")
>
> with self.cursor as stream:
> for change in stream:
> yield change
>
> pipeline = (
> p
> | 'dummy_create' >> beam.Create([0])
> | 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION))
> | 'process' >> beam.Map(process)
> )
> ```
>
> My hunch is that there's a way to leverage the StreamingCreate PTransform to 
> read MongoDB Oplog or any other external unbounded source. Alternatively, 
> I've also seen a good example on how to create a BoundedSource. This might be 
> similar for an unbounded one but I think the Beam Programming Guide 
> discourages building sources using the Source API.
>
> I'd appreciate any input or feedback you might have about the code and 
> approach I'm taking!
>
> Thanks,
> David.


Re: [Call for items] November Beam Newsletter

2018-11-13 Thread Etienne Chauchot
Hi,I just added some things that were done.
Etienne
Le lundi 12 novembre 2018 à 12:22 +, Matthias Baetens a écrit :
> Looks great, thanks for the effort and for including the Summit blogpost, 
> Rose!
> On Thu, 8 Nov 2018 at 22:55 Rose Nguyen  wrote:
> > Hi Beamers:
> > 
> > 
> > Time to sync with the community on all the awesome stuff we've been doing!
> > 
> > 
> > Add the highlights from October to now (or planned events and talks) that 
> > you want to share by 11/14 11:59 p.m. PDT.
> > 
> > We will collect the notes via Google docs but send out the final version 
> > directly to the user mailing list. If you
> > do not know how to format something, it is OK to just put down the info and 
> > I will edit. I'll ship out the
> > newsletter on 11/15. 
> > 
> > [1] 
> > https://docs.google.com/document/d/1kKQ4a9RdptB6NwYlqmI9tTcdLAUzDnWi2dkvUi0J_Ww
> > -- 
> > Rose Thị Nguyễn
> -- 
>  


Reading from custom Unbounded Sources with the Python SDK

2018-11-13 Thread David Gasquez
Hey there,

I've been exploring Apache Beam lately and I'm now working on my first
*production* pipeline. The goal of this pipeline is to replicate a MongoDB
Collection into Big Query. To do that I want to read MongoDB Oplog
 and use these
events to update the table in Big Query (happy to expand more on this if
needed).

MongoDB Oplog is an unbounded source. I was wondering what are the best
practices dealing with this kind of sources in Python. Currently, I'm using
a custom beam.DoFn to read the *Oplog* inside a streaming pipeline. That
said, I'm not sure how this will behave and how can be improved (the
pipeline relies on a beam.Create([0]) first step that seems hacky to me

).

This are the key snippets of the code:

```
class OplogSourceDoFn(beam.DoFn):
def __init__(self, uri, database, collection):
super(OplogSourceDoFn, self).__init__()
self.uri = uri
self.database = database
self.collection = collection

def client(self):
self._client = pymongo.MongoClient(self.uri,
readPreference="secondaryPreferred")
return self._client

def process(self, element):
client = self.client()
self.db = client.get_database(self.database)
self.col = self.db.get_collection(self.collection)
self.cursor = self.col.watch(full_document="updateLookup")

with self.cursor as stream:
for change in stream:
yield change

pipeline = (
p
| 'dummy_create' >> beam.Create([0])
| 'read_oplog' >> beam.ParDo(OplogSourceDoFn(URI, DATABASE, COLLECTION))
| 'process' >> beam.Map(process)
)
```

My hunch is that there's a way to leverage the StreamingCreate PTransform

to
read MongoDB Oplog or any other external unbounded source. Alternatively,
I've also seen a good example on how to create a BoundedSource
.
This might be similar for an unbounded one but I think the Beam Programming
Guide discourages building sources using the Source API

.

I'd appreciate any input or feedback you might have about the code and
approach I'm taking!

Thanks,
David.