On Mon, May 4, 2020 at 2:44 PM Rion Williams <[email protected]> wrote:

> Hi Luke,
>
> I had posed this question originally and it's related to the same basic
> problem (or the same basic pipeline rather), in that I'd be performing a
> series of enrichments against various Kafka-driven sources and was
> wondering the best way to chain those together.
>
> The other question was predominately around the actual
> "joining/enrichment" process, whereas this one really focused on the
> tagging / transforms and how to best perform those.
>
> On 2020/05/04 20:57:23, Luke Cwik <[email protected]> wrote:
> > Is this a duplicate of
> >
> https://lists.apache.org/thread.html/r2272040a06457cfdb867832a61f2933d1a3ba832057cffda89ee248a%40%3Cuser.beam.apache.org%3E
> > ?
> >
> >
> > On Tue, Apr 28, 2020 at 9:26 AM Rion Williams <[email protected]>
> wrote:
> >
> > > Hi all,
> > >
> > > I'm trying to implement a process and I'm not quite sure what the best
> > > approach to efficiently implement it might be while taking advantage of
> > > Beam's parallelism and recommended patterns. Basically the problem
> itself
> > > can be summarized as follows:
> > >
> > > I have a series of incoming events which are read from Kafka into my
> Beam
> > > pipeline. These events are Avro-formatted messages which contains
> nearly a
> > > hundred different fields with other nested records and values about the
> > > event (e.g. users, ip addresses, etc.). The goal of the pipeline is two
> > > fold:
> > >
> > > - Extract any instances of various entities (e.g. users, ip addresses,
> > > etc.) from the original object, key them (using a deterministic UUID
> seeded
> > > by all of the known "key" values), and send them off to their own
> dedicated
> > > Kafka topic (e.g. all extracted users -> users_topic).
> > > - Enrich the original event using the identifiers for all of the
> extracted
> > > entities (e.g. if the event came in with an array containing two
> users, the
> > > expectation is that the generated keys would be present on each of
> those
> > > user instances after leaving the pipeline)
> > >
> > > My current approach has been to simply build a single transform to
> avoid
> > > mutating / enriching the event throughout the pipeline for each series
> of
> > > entities as such:
> > >
> > > // Extract all entities (into a PCollectionTuple)
> > >         val taggedEvents = Pipeline
> > >             .create(options)
> > >             .apply("Read Events from Kafka",
> > > KafkaIO.read<Event>(options.incomingEventsTopic, options))
> > >             .apply("Identify All Entity Instances",
> Entities.identify())
> > >
> > >         // Users
> > >         taggedEvents
> > >             .get(Entities.Tags.users)
> > >             .apply("Flatten Multiple Identified Users",
> > > Flatten.iterables())
> > >             .apply("Write Users to Kafka",
> > > KafkaIO.write<User>(options.usersTopic, options))
> > >
> > >         // IP Addresses
> > >         taggedEvents
> > >             .get(Entities.Tags.ipAddresses)
> > >             .apply("Flatten Multiple Identified IP Addresses",
> > > Flatten.iterables())
> > >             .apply("Write IP Addresses to Kafka",
> > > KafkaIO.write<User>(options.ipAddressesTopic, options))
> > >
> > >         // Events (enriched)
> > >         taggedEvents
> > >             .get(Entities.Tags.events)
> > >             .apply("Write Enriched Events to Kafka",
> > > KafkaIO.write<Event>(options.identifiedEventsTopic, options))
> > >
> > > As mentioned, each of these individual extractions for various entities
> > > need to add the appropriate identifiers onto the original event such
> that
> > > when the last call above is made (sending events to its destination
> topic).
> > >
> > > Currently my Entities.identify() transform basically does the following
> > > behind the scenes:
> > >
> > > class Identify() : PTransform<PCollection<KV<String, Event>>,
> > > PCollectionTuple>() {
> > >         override fun expand(input: PCollection<KV<String, Event>>):
> > > PCollectionTuple {
> > >             // Take an event in
> > >             return input
> > >                 .apply("Extract all available entities",
> > >                         ParDo
> > >                             .of(ExtractAllEntities())
> > >                             .withOutputTags(Tags.events,
> > > TupleTagList.of(listOf(
> > >                                     Entities.Tags.users,
> > >                                     Entities.Tags.computerEndpoints
> > >                             )))
> > >                 )
> > >         }
> > >     }
> > >
> > >     class ExtractAllEntities() : TraceableDoFn<KV<String, Event>,
> > > KV<String, Event>>() {
> > >
> > >         @ProcessElement
> > >         fun processElement(context: ProcessContext) {
> > >             // Get the event (mutable)
> > >             val event = context.element().value.toMutable<Event>()
> > >
> > >             // Process the users
> > >             context.output(Entities.Tags.users, Users.extract(event,
> > > tracer))
> > >
> > >             // Process the computer endpoints
> > >             context.output(Entities.Tags.computerEndpoints,
> > > ComputerEndpoints.extract(event, tracer))
> > >
> > >             // Tag output
> > >             context.output(Entities.Tags.events,
> > > KV.of(context.element().key, event))
> > >         }
> > >     }
> > > }
> > >
> > > Where all of these extract() function calls are simply private methods,
> > > which seems wrong. By creating a single instance of a mutable event, it
> > > allows me to perform all of the mutations and extractions in a single
> pass
> > > (as opposed to doing one, pulling out the event, passing it into
> another
> > > transform, repeated ad nauseum).
> > >
> > > So I have a few questions:
> > >
> > > - Is there a preferred pattern for handling something like this?
>

Either a set of stateful DoFns and transforms doing the necessary
transformations to get the right keys or an external service that you can
use to perform the joining.


> > > - Should those private methods (e.g. ComputerEndpoints.extract(),
> > > Users.extract(), etc.) simply be private functions that take in an
> event
> > > and return an array of the requested entities? Or would this better be
> > > served as DoFn<T> or some other types of transformations?
>

It depends on how you decide to enrich the data. If you use a stateful DoFn
then the "key" for each stateful DoFn will be data like each userId and
hence you'll need transforms that perform these manipulations. If you use
an external service then you should design for what would be the optimal
query pattern for it. If it is something simple like memcache then it is up
to you but if it can support a complex query it would be worthwhile to just
use private functions to extract all the data. Also, you might want to take
a look into GroupIntoBatches if your external service performs better with
some batching of requests.


> > > - In my mind, this seems like it could also be written in such a way so
> > > that all of the entity-specific transformations could yield multiple
> > > PCollectionTuples (each with the enriched event and a collection of
> entity
> > > instances) which could be merged at some point. Would there be value to
> > > doing that?
>

You'll need to do this if you want to use stateful DoFns for enriching and
then joining the data back together. If you use an external service then it
doesn't provide value.


> > >
> > > Sorry for the long winded question. I'm still quite new to the Beam
> > > ecosystem and I'm just trying to follow the best practices to take
> > > advantage of things like parallelism and some of the other benefits
> that
> > > Beam provides out of the box.
> > >
> > > Thanks all and I'll be happy to elaborate on any of the steps or
> provide
> > > any additional details as needed.
> > >
> > > Rion
> > >
> > >
> > >
> >
>

Reply via email to