Hello everybody,
I am new with Apache Beam and I need some advices on own to write
properly a custom PTransform in Python.
I try to read entities from a PostgreSQL database using SQLAlchemy. I
followed the examples in the documentation for Pub/Sub
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub)
and Datastore
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
Here is what I achieved so far:
/class ExtractFromPostgreSQLFn(beam.DoFn)://
// """//
// Extract PCollection from PostgreSQL//
// """//
//
// def start_bundle(self)://
// self._session = Session()//
//
// def process(self, element)://
// raise NotImplementedError//
//
// def finish_bundle(self)://
// self._session.close()//
//
//
//class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn)://
// def start_bundle(self, arg1, arg2)://
// self._arg1 = arg1//
// self._arg2 = arg2//
//
// def process(self, element)://
// entities = (//
// self._session.query(Entity)//
// .filter(Entity.arg1 == self._arg1)//
// .filter(Entity.arg2 == self._arg2)//
// .all()//
// )//
// return (self._arg1, entities)/
/class ExtractEntity(PTransform)://
// """//
// A ```PTransform``` Extract of the Entity.//
// """//
//
// def __init__(self, arg1, arg2="enabled")://
// """//
// Inializes ```ExtractEntity```//
// Args://
// arg1//
// arg2//
// """//
// if not arg1://
// raise ValueError("arg1 cannot be empty")//
//
// self._arg1 = arg1//
// self._arg2 = arg2//
//
// def expand(self, pcoll)://
// """//
// This is a composite transform involves the following://
// 1. Create a query object //
// 2. Run the query against the database. //
// """//
// database_entities = pcoll.pipeline | "Extract datastore users"
>> beam.ParDo(//
////ReadEntityFromPostgresqlFn//(self._arg1, self.arg2)//
// )//
// return database_entities/
The PTransform is called at the very beginning of my pipeline:
/p = beam.Pipeline(options=pipeline_options)//
//database//= (//
// p//
// | "Extract datastore users"/
/ >> ExtractDatastoreUsers(arg1="123456")/
/)/
It keeps raising/AttributeError: 'PBegin' object has no attribute
'windowing'/.
Please note that it's only a draft (I will extract several entities from
the database, so the query will not be "hard-coded" but passed as a
parameter at some point).
I have thus several questions:
1) Does anyone know why what I am trying to achieve is not working ?
2) Is this the good way to proceed, i.e. creating a custom PTransform
which executes ParDo operations, or should I go directly with ParDo
operations ?
3) Apart from
https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations,
is there a guide on own to proper write a custom PTransform ?
4) Is it better to use the low-level psycopg2 driver here or using
SQLAlchemy is fine ?
Many thanks in advance for your time and help !
Jonathan