Hi Luke,

I had posed this question originally and it's related to the same basic problem 
(or the same basic pipeline rather), in that I'd be performing a series of 
enrichments against various Kafka-driven sources and was wondering the best way 
to chain those together.

The other question was predominately around the actual "joining/enrichment" 
process, whereas this one really focused on the tagging / transforms and how to 
best perform those.

On 2020/05/04 20:57:23, Luke Cwik <[email protected]> wrote: 
> Is this a duplicate of
> https://lists.apache.org/thread.html/r2272040a06457cfdb867832a61f2933d1a3ba832057cffda89ee248a%40%3Cuser.beam.apache.org%3E
> ?
> 
> 
> On Tue, Apr 28, 2020 at 9:26 AM Rion Williams <[email protected]> wrote:
> 
> > Hi all,
> >
> > I'm trying to implement a process and I'm not quite sure what the best
> > approach to efficiently implement it might be while taking advantage of
> > Beam's parallelism and recommended patterns. Basically the problem itself
> > can be summarized as follows:
> >
> > I have a series of incoming events which are read from Kafka into my Beam
> > pipeline. These events are Avro-formatted messages which contains nearly a
> > hundred different fields with other nested records and values about the
> > event (e.g. users, ip addresses, etc.). The goal of the pipeline is two
> > fold:
> >
> > - Extract any instances of various entities (e.g. users, ip addresses,
> > etc.) from the original object, key them (using a deterministic UUID seeded
> > by all of the known "key" values), and send them off to their own dedicated
> > Kafka topic (e.g. all extracted users -> users_topic).
> > - Enrich the original event using the identifiers for all of the extracted
> > entities (e.g. if the event came in with an array containing two users, the
> > expectation is that the generated keys would be present on each of those
> > user instances after leaving the pipeline)
> >
> > My current approach has been to simply build a single transform to avoid
> > mutating / enriching the event throughout the pipeline for each series of
> > entities as such:
> >
> > // Extract all entities (into a PCollectionTuple)
> >         val taggedEvents = Pipeline
> >             .create(options)
> >             .apply("Read Events from Kafka",
> > KafkaIO.read<Event>(options.incomingEventsTopic, options))
> >             .apply("Identify All Entity Instances", Entities.identify())
> >
> >         // Users
> >         taggedEvents
> >             .get(Entities.Tags.users)
> >             .apply("Flatten Multiple Identified Users",
> > Flatten.iterables())
> >             .apply("Write Users to Kafka",
> > KafkaIO.write<User>(options.usersTopic, options))
> >
> >         // IP Addresses
> >         taggedEvents
> >             .get(Entities.Tags.ipAddresses)
> >             .apply("Flatten Multiple Identified IP Addresses",
> > Flatten.iterables())
> >             .apply("Write IP Addresses to Kafka",
> > KafkaIO.write<User>(options.ipAddressesTopic, options))
> >
> >         // Events (enriched)
> >         taggedEvents
> >             .get(Entities.Tags.events)
> >             .apply("Write Enriched Events to Kafka",
> > KafkaIO.write<Event>(options.identifiedEventsTopic, options))
> >
> > As mentioned, each of these individual extractions for various entities
> > need to add the appropriate identifiers onto the original event such that
> > when the last call above is made (sending events to its destination topic).
> >
> > Currently my Entities.identify() transform basically does the following
> > behind the scenes:
> >
> > class Identify() : PTransform<PCollection<KV<String, Event>>,
> > PCollectionTuple>() {
> >         override fun expand(input: PCollection<KV<String, Event>>):
> > PCollectionTuple {
> >             // Take an event in
> >             return input
> >                 .apply("Extract all available entities",
> >                         ParDo
> >                             .of(ExtractAllEntities())
> >                             .withOutputTags(Tags.events,
> > TupleTagList.of(listOf(
> >                                     Entities.Tags.users,
> >                                     Entities.Tags.computerEndpoints
> >                             )))
> >                 )
> >         }
> >     }
> >
> >     class ExtractAllEntities() : TraceableDoFn<KV<String, Event>,
> > KV<String, Event>>() {
> >
> >         @ProcessElement
> >         fun processElement(context: ProcessContext) {
> >             // Get the event (mutable)
> >             val event = context.element().value.toMutable<Event>()
> >
> >             // Process the users
> >             context.output(Entities.Tags.users, Users.extract(event,
> > tracer))
> >
> >             // Process the computer endpoints
> >             context.output(Entities.Tags.computerEndpoints,
> > ComputerEndpoints.extract(event, tracer))
> >
> >             // Tag output
> >             context.output(Entities.Tags.events,
> > KV.of(context.element().key, event))
> >         }
> >     }
> > }
> >
> > Where all of these extract() function calls are simply private methods,
> > which seems wrong. By creating a single instance of a mutable event, it
> > allows me to perform all of the mutations and extractions in a single pass
> > (as opposed to doing one, pulling out the event, passing it into another
> > transform, repeated ad nauseum).
> >
> > So I have a few questions:
> >
> > - Is there a preferred pattern for handling something like this?
> > - Should those private methods (e.g. ComputerEndpoints.extract(),
> > Users.extract(), etc.) simply be private functions that take in an event
> > and return an array of the requested entities? Or would this better be
> > served as DoFn<T> or some other types of transformations?
> > - In my mind, this seems like it could also be written in such a way so
> > that all of the entity-specific transformations could yield multiple
> > PCollectionTuples (each with the enriched event and a collection of entity
> > instances) which could be merged at some point. Would there be value to
> > doing that?
> >
> > Sorry for the long winded question. I'm still quite new to the Beam
> > ecosystem and I'm just trying to follow the best practices to take
> > advantage of things like parallelism and some of the other benefits that
> > Beam provides out of the box.
> >
> > Thanks all and I'll be happy to elaborate on any of the steps or provide
> > any additional details as needed.
> >
> > Rion
> >
> >
> >
> 

Reply via email to