Thank for your answer !

It does somehow go against what is here, https://beam.apache.org/documentation/io/authoring-overview/#read-transforms, which states that one can also use ParDo operation. So should I remove the PTransform and read directly from the database using custom ParDo ?


Le 01/10/2018 à 15:32, Abhijit Chanda a écrit :

Further refer beam documentation on design-your-pipeline <https://beam.apache.org/documentation/pipelines/design-your-pipeline/>

*From: *Abhijit Chanda <[email protected]>
*Date: *Monday, 1 October 2018 at 6:58 PM
*To: *Jonathan Perron <[email protected]>, "[email protected]" <[email protected]> *Subject: *Re: [Need advices] Troubles to create custom PTransform in Python

You can’t call transform function as a very first step in any pipeline. First block should be always source and followed by any number of transforms based upon your use case.

*From: *Jonathan Perron <[email protected]>
*Reply-To: *"[email protected]" <[email protected]>
*Date: *Monday, 1 October 2018 at 3:14 PM
*To: *"[email protected]" <[email protected]>
*Subject: *Fwd: [Need advices] Troubles to create custom PTransform in Python

Hello everybody,

I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python.

I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).

Here is what I achieved so far:

/class ExtractFromPostgreSQLFn(beam.DoFn):
    """
    Extract PCollection from PostgreSQL
    """

    def start_bundle(self):
        self._session = Session()

    def process(self, element):
        raise NotImplementedError

    def finish_bundle(self):
        self._session.close()


class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn):
    def start_bundle(self, arg1, arg2):
        self._arg1 = arg1
        self._arg2 = arg2

    def process(self, element):
        entities = (
            self._session.query(Entity)
            .filter(Entity.arg1 == self._arg1)
            .filter(Entity.arg2 == self._arg2)
            .all()
        )
        return (self._arg1, entities)/


/class ExtractEntity(PTransform):
    """
    A ```PTransform``` Extract of the Entity.
    """

    def __init__(self, arg1, arg2="enabled"):
        """
        Inializes ```ExtractEntity```
        Args:
            arg1
            arg2
        """
        if not arg1:
            raise ValueError("arg1 cannot be empty")

        self._arg1 = arg1
        self._arg2 = arg2

    def expand(self, pcoll):
        """
        This is a composite transform involves the following:
        1. Create a query object
        2. Run the query against the database.
        """
        database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo(
            ReadEntityFromPostgresqlFn(self._arg1, self.arg2)
        )
        return database_entities/

The PTransform is called at the very beginning of my pipeline:

/p = beam.Pipeline(options=pipeline_options)
database = (
    p
    | "Extract datastore users"/

/    >> ExtractDatastoreUsers(arg1="123456")/

/)/

It keeps raising/AttributeError: 'PBegin' object has no attribute 'windowing'/.

Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point).

I have thus several questions:

1) Does anyone know why what I am trying to achieve is not working ?

2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ?

3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ?

4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ?

Many thanks in advance for your time and help !

Jonathan


Reply via email to