Thank for your answer !
It does somehow go against what is here,
https://beam.apache.org/documentation/io/authoring-overview/#read-transforms,
which states that one can also use ParDo operation. So should I remove
the PTransform and read directly from the database using custom ParDo ?
Le 01/10/2018 à 15:32, Abhijit Chanda a écrit :
Further refer beam documentation on design-your-pipeline
<https://beam.apache.org/documentation/pipelines/design-your-pipeline/>
*From: *Abhijit Chanda <[email protected]>
*Date: *Monday, 1 October 2018 at 6:58 PM
*To: *Jonathan Perron <[email protected]>,
"[email protected]" <[email protected]>
*Subject: *Re: [Need advices] Troubles to create custom PTransform in
Python
You can’t call transform function as a very first step in any
pipeline. First block should be always source and followed by any
number of transforms based upon your use case.
*From: *Jonathan Perron <[email protected]>
*Reply-To: *"[email protected]" <[email protected]>
*Date: *Monday, 1 October 2018 at 3:14 PM
*To: *"[email protected]" <[email protected]>
*Subject: *Fwd: [Need advices] Troubles to create custom PTransform in
Python
Hello everybody,
I am new with Apache Beam and I need some advices on own to write
properly a custom PTransform in Python.
I try to read entities from a PostgreSQL database using SQLAlchemy. I
followed the examples in the documentation for Pub/Sub
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub)
and Datastore
(https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).
Here is what I achieved so far:
/class ExtractFromPostgreSQLFn(beam.DoFn):
"""
Extract PCollection from PostgreSQL
"""
def start_bundle(self):
self._session = Session()
def process(self, element):
raise NotImplementedError
def finish_bundle(self):
self._session.close()
class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
def start_bundle(self, arg1, arg2):
self._arg1 = arg1
self._arg2 = arg2
def process(self, element):
entities = (
self._session.query(Entity)
.filter(Entity.arg1 == self._arg1)
.filter(Entity.arg2 == self._arg2)
.all()
)
return (self._arg1, entities)/
/class ExtractEntity(PTransform):
"""
A ```PTransform``` Extract of the Entity.
"""
def __init__(self, arg1, arg2="enabled"):
"""
Inializes ```ExtractEntity```
Args:
arg1
arg2
"""
if not arg1:
raise ValueError("arg1 cannot be empty")
self._arg1 = arg1
self._arg2 = arg2
def expand(self, pcoll):
"""
This is a composite transform involves the following:
1. Create a query object
2. Run the query against the database.
"""
database_entities = pcoll.pipeline | "Extract datastore users"
>> beam.ParDo(
ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
)
return database_entities/
The PTransform is called at the very beginning of my pipeline:
/p = beam.Pipeline(options=pipeline_options)
database = (
p
| "Extract datastore users"/
/ >> ExtractDatastoreUsers(arg1="123456")/
/)/
It keeps raising/AttributeError: 'PBegin' object has no attribute
'windowing'/.
Please note that it's only a draft (I will extract several entities
from the database, so the query will not be "hard-coded" but passed as
a parameter at some point).
I have thus several questions:
1) Does anyone know why what I am trying to achieve is not working ?
2) Is this the good way to proceed, i.e. creating a custom PTransform
which executes ParDo operations, or should I go directly with ParDo
operations ?
3) Apart from
https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations,
is there a guide on own to proper write a custom PTransform ?
4) Is it better to use the low-level psycopg2 driver here or using
SQLAlchemy is fine ?
Many thanks in advance for your time and help !
Jonathan