Hello everybody,

I am new with Apache Beam and I need some advices on own to write properly a custom PTransform in Python.

I try to read entities from a PostgreSQL database using SQLAlchemy. I followed the examples in the documentation for Pub/Sub (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/pubsub.html#ReadFromPubSub) and Datastore (https://beam.apache.org/documentation/sdks/pydoc/2.5.0/_modules/apache_beam/io/gcp/datastore/v1/datastoreio.html#ReadFromDatastore).

Here is what I achieved so far:

/class ExtractFromPostgreSQLFn(beam.DoFn)://
//    """//
//    Extract PCollection from PostgreSQL//
//    """//
//
//    def start_bundle(self)://
//        self._session = Session()//
//
//    def process(self, element)://
//        raise NotImplementedError//
//
//    def finish_bundle(self)://
//        self._session.close()//
//
//
//class ReadEntityFromPostgresqlFn(ExtractFromPostgreSQLFn)://
//    def start_bundle(self, arg1, arg2)://
//        self._arg1 = arg1//
//        self._arg2 = arg2//
//
//    def process(self, element)://
//        entities = (//
//            self._session.query(Entity)//
//            .filter(Entity.arg1 == self._arg1)//
//            .filter(Entity.arg2 == self._arg2)//
//            .all()//
//        )//
//        return (self._arg1, entities)/


/class ExtractEntity(PTransform)://
//    """//
//    A ```PTransform``` Extract of the Entity.//
//    """//
//
//    def __init__(self, arg1, arg2="enabled")://
//        """//
//        Inializes ```ExtractEntity```//
//        Args://
//            arg1//
//            arg2//
//        """//
//        if not arg1://
//            raise ValueError("arg1 cannot be empty")//
//
//        self._arg1 = arg1//
//        self._arg2 = arg2//
//
//    def expand(self, pcoll)://
//        """//
//        This is a composite transform involves the following://
//        1. Create a query object //
//        2. Run the query against the database. //
//        """//
//        database_entities = pcoll.pipeline | "Extract datastore users" >> beam.ParDo(//
////ReadEntityFromPostgresqlFn//(self._arg1, self.arg2)//
//        )//
//        return database_entities/

The PTransform is called at the very beginning of my pipeline:

/p = beam.Pipeline(options=pipeline_options)//
//database//= (//
//    p//
//    | "Extract datastore users"/

/    >> ExtractDatastoreUsers(arg1="123456")/

/)/

It keeps raising/AttributeError: 'PBegin' object has no attribute 'windowing'/.

Please note that it's only a draft (I will extract several entities from the database, so the query will not be "hard-coded" but passed as a parameter at some point).

I have thus several questions:

1) Does anyone know why what I am trying to achieve is not working ?

2) Is this the good way to proceed, i.e. creating a custom PTransform which executes ParDo operations, or should I go directly with ParDo operations ?

3) Apart from https://beam.apache.org/contribute/ptransform-style-guide/#language-neutral-considerations, is there a guide on own to proper write a custom PTransform ?

4) Is it better to use the low-level psycopg2 driver here or using SQLAlchemy is fine ?

Many thanks in advance for your time and help !

Jonathan



Reply via email to