Hello,
I am looking for some way to access data stored in PostgreSQL and don't
know if I should go for a Sink or ParDo operations. It is stated that
ParDo could be used but I'm not sure this is what will solve my problem,
so here I am !I managed to write in the database with only ParDo
operations, so I guess it is also possible here.
Some details about my use case:
* The Python SDK is used;
* Reading in the database is the first operation of the pipeline before
making some calculation;
* It is performed with SQLAlchemy, but could also be done with psycopg2;
* I don't think parallelizing this operation is necessary as the query
are and will stay really simple (i.e. SELECT foo FROM bar WHERE fuzz).
Here are my DoFn classes:
/class ExtractFromPostgreSQLFn(beam.DoFn):
"""
Extract PCollection from PostgreSQL
"""
def start_bundle(self):
self._session = Session()
def process(self, element):
raise NotImplementedError
def finish_bundle(self):
self._session.close()
class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
def start_bundle(self, arg1, arg2):
super(ReadFromPostgresqlFn, self).start_bundle()
self._arg1 = arg1
self._arg2 = arg2
def process(self, element):
entities = (
self._session.query(Entity)
.filter(Entity.arg1 == self._arg1)
.filter(Entity.arg2 == self._arg2)
.all()
)
yield (self._arg1, arg2)/
As I said, I used it just after the initialization of the pipeline:
/p = beam.Pipeline(options=pipeline_options)
psql_entities = p | "Extract Entities from PSQL backup" >>
beam.ParDo(ReadDatastoreUsersFromPostgresqlFn())/
Unfortunately, I end up with an /AttributeError: 'PBegin' object has no
attribute 'windowing'/ error.
Where did I make a mistake ? I take every input you could provide me on
this topic.
Thanks for your time,
Jonathan