Is this a duplicate of https://lists.apache.org/thread.html/r2272040a06457cfdb867832a61f2933d1a3ba832057cffda89ee248a%40%3Cuser.beam.apache.org%3E ?
On Tue, Apr 28, 2020 at 9:26 AM Rion Williams <[email protected]> wrote: > Hi all, > > I'm trying to implement a process and I'm not quite sure what the best > approach to efficiently implement it might be while taking advantage of > Beam's parallelism and recommended patterns. Basically the problem itself > can be summarized as follows: > > I have a series of incoming events which are read from Kafka into my Beam > pipeline. These events are Avro-formatted messages which contains nearly a > hundred different fields with other nested records and values about the > event (e.g. users, ip addresses, etc.). The goal of the pipeline is two > fold: > > - Extract any instances of various entities (e.g. users, ip addresses, > etc.) from the original object, key them (using a deterministic UUID seeded > by all of the known "key" values), and send them off to their own dedicated > Kafka topic (e.g. all extracted users -> users_topic). > - Enrich the original event using the identifiers for all of the extracted > entities (e.g. if the event came in with an array containing two users, the > expectation is that the generated keys would be present on each of those > user instances after leaving the pipeline) > > My current approach has been to simply build a single transform to avoid > mutating / enriching the event throughout the pipeline for each series of > entities as such: > > // Extract all entities (into a PCollectionTuple) > val taggedEvents = Pipeline > .create(options) > .apply("Read Events from Kafka", > KafkaIO.read<Event>(options.incomingEventsTopic, options)) > .apply("Identify All Entity Instances", Entities.identify()) > > // Users > taggedEvents > .get(Entities.Tags.users) > .apply("Flatten Multiple Identified Users", > Flatten.iterables()) > .apply("Write Users to Kafka", > KafkaIO.write<User>(options.usersTopic, options)) > > // IP Addresses > taggedEvents > .get(Entities.Tags.ipAddresses) > .apply("Flatten Multiple Identified IP Addresses", > Flatten.iterables()) > .apply("Write IP Addresses to Kafka", > KafkaIO.write<User>(options.ipAddressesTopic, options)) > > // Events (enriched) > taggedEvents > .get(Entities.Tags.events) > .apply("Write Enriched Events to Kafka", > KafkaIO.write<Event>(options.identifiedEventsTopic, options)) > > As mentioned, each of these individual extractions for various entities > need to add the appropriate identifiers onto the original event such that > when the last call above is made (sending events to its destination topic). > > Currently my Entities.identify() transform basically does the following > behind the scenes: > > class Identify() : PTransform<PCollection<KV<String, Event>>, > PCollectionTuple>() { > override fun expand(input: PCollection<KV<String, Event>>): > PCollectionTuple { > // Take an event in > return input > .apply("Extract all available entities", > ParDo > .of(ExtractAllEntities()) > .withOutputTags(Tags.events, > TupleTagList.of(listOf( > Entities.Tags.users, > Entities.Tags.computerEndpoints > ))) > ) > } > } > > class ExtractAllEntities() : TraceableDoFn<KV<String, Event>, > KV<String, Event>>() { > > @ProcessElement > fun processElement(context: ProcessContext) { > // Get the event (mutable) > val event = context.element().value.toMutable<Event>() > > // Process the users > context.output(Entities.Tags.users, Users.extract(event, > tracer)) > > // Process the computer endpoints > context.output(Entities.Tags.computerEndpoints, > ComputerEndpoints.extract(event, tracer)) > > // Tag output > context.output(Entities.Tags.events, > KV.of(context.element().key, event)) > } > } > } > > Where all of these extract() function calls are simply private methods, > which seems wrong. By creating a single instance of a mutable event, it > allows me to perform all of the mutations and extractions in a single pass > (as opposed to doing one, pulling out the event, passing it into another > transform, repeated ad nauseum). > > So I have a few questions: > > - Is there a preferred pattern for handling something like this? > - Should those private methods (e.g. ComputerEndpoints.extract(), > Users.extract(), etc.) simply be private functions that take in an event > and return an array of the requested entities? Or would this better be > served as DoFn<T> or some other types of transformations? > - In my mind, this seems like it could also be written in such a way so > that all of the entity-specific transformations could yield multiple > PCollectionTuples (each with the enriched event and a collection of entity > instances) which could be merged at some point. Would there be value to > doing that? > > Sorry for the long winded question. I'm still quite new to the Beam > ecosystem and I'm just trying to follow the best practices to take > advantage of things like parallelism and some of the other benefits that > Beam provides out of the box. > > Thanks all and I'll be happy to elaborate on any of the steps or provide > any additional details as needed. > > Rion > > >
