Hi Luke, I had posed this question originally and it's related to the same basic problem (or the same basic pipeline rather), in that I'd be performing a series of enrichments against various Kafka-driven sources and was wondering the best way to chain those together.
The other question was predominately around the actual "joining/enrichment" process, whereas this one really focused on the tagging / transforms and how to best perform those. On 2020/05/04 20:57:23, Luke Cwik <[email protected]> wrote: > Is this a duplicate of > https://lists.apache.org/thread.html/r2272040a06457cfdb867832a61f2933d1a3ba832057cffda89ee248a%40%3Cuser.beam.apache.org%3E > ? > > > On Tue, Apr 28, 2020 at 9:26 AM Rion Williams <[email protected]> wrote: > > > Hi all, > > > > I'm trying to implement a process and I'm not quite sure what the best > > approach to efficiently implement it might be while taking advantage of > > Beam's parallelism and recommended patterns. Basically the problem itself > > can be summarized as follows: > > > > I have a series of incoming events which are read from Kafka into my Beam > > pipeline. These events are Avro-formatted messages which contains nearly a > > hundred different fields with other nested records and values about the > > event (e.g. users, ip addresses, etc.). The goal of the pipeline is two > > fold: > > > > - Extract any instances of various entities (e.g. users, ip addresses, > > etc.) from the original object, key them (using a deterministic UUID seeded > > by all of the known "key" values), and send them off to their own dedicated > > Kafka topic (e.g. all extracted users -> users_topic). > > - Enrich the original event using the identifiers for all of the extracted > > entities (e.g. if the event came in with an array containing two users, the > > expectation is that the generated keys would be present on each of those > > user instances after leaving the pipeline) > > > > My current approach has been to simply build a single transform to avoid > > mutating / enriching the event throughout the pipeline for each series of > > entities as such: > > > > // Extract all entities (into a PCollectionTuple) > > val taggedEvents = Pipeline > > .create(options) > > .apply("Read Events from Kafka", > > KafkaIO.read<Event>(options.incomingEventsTopic, options)) > > .apply("Identify All Entity Instances", Entities.identify()) > > > > // Users > > taggedEvents > > .get(Entities.Tags.users) > > .apply("Flatten Multiple Identified Users", > > Flatten.iterables()) > > .apply("Write Users to Kafka", > > KafkaIO.write<User>(options.usersTopic, options)) > > > > // IP Addresses > > taggedEvents > > .get(Entities.Tags.ipAddresses) > > .apply("Flatten Multiple Identified IP Addresses", > > Flatten.iterables()) > > .apply("Write IP Addresses to Kafka", > > KafkaIO.write<User>(options.ipAddressesTopic, options)) > > > > // Events (enriched) > > taggedEvents > > .get(Entities.Tags.events) > > .apply("Write Enriched Events to Kafka", > > KafkaIO.write<Event>(options.identifiedEventsTopic, options)) > > > > As mentioned, each of these individual extractions for various entities > > need to add the appropriate identifiers onto the original event such that > > when the last call above is made (sending events to its destination topic). > > > > Currently my Entities.identify() transform basically does the following > > behind the scenes: > > > > class Identify() : PTransform<PCollection<KV<String, Event>>, > > PCollectionTuple>() { > > override fun expand(input: PCollection<KV<String, Event>>): > > PCollectionTuple { > > // Take an event in > > return input > > .apply("Extract all available entities", > > ParDo > > .of(ExtractAllEntities()) > > .withOutputTags(Tags.events, > > TupleTagList.of(listOf( > > Entities.Tags.users, > > Entities.Tags.computerEndpoints > > ))) > > ) > > } > > } > > > > class ExtractAllEntities() : TraceableDoFn<KV<String, Event>, > > KV<String, Event>>() { > > > > @ProcessElement > > fun processElement(context: ProcessContext) { > > // Get the event (mutable) > > val event = context.element().value.toMutable<Event>() > > > > // Process the users > > context.output(Entities.Tags.users, Users.extract(event, > > tracer)) > > > > // Process the computer endpoints > > context.output(Entities.Tags.computerEndpoints, > > ComputerEndpoints.extract(event, tracer)) > > > > // Tag output > > context.output(Entities.Tags.events, > > KV.of(context.element().key, event)) > > } > > } > > } > > > > Where all of these extract() function calls are simply private methods, > > which seems wrong. By creating a single instance of a mutable event, it > > allows me to perform all of the mutations and extractions in a single pass > > (as opposed to doing one, pulling out the event, passing it into another > > transform, repeated ad nauseum). > > > > So I have a few questions: > > > > - Is there a preferred pattern for handling something like this? > > - Should those private methods (e.g. ComputerEndpoints.extract(), > > Users.extract(), etc.) simply be private functions that take in an event > > and return an array of the requested entities? Or would this better be > > served as DoFn<T> or some other types of transformations? > > - In my mind, this seems like it could also be written in such a way so > > that all of the entity-specific transformations could yield multiple > > PCollectionTuples (each with the enriched event and a collection of entity > > instances) which could be merged at some point. Would there be value to > > doing that? > > > > Sorry for the long winded question. I'm still quite new to the Beam > > ecosystem and I'm just trying to follow the best practices to take > > advantage of things like parallelism and some of the other benefits that > > Beam provides out of the box. > > > > Thanks all and I'll be happy to elaborate on any of the steps or provide > > any additional details as needed. > > > > Rion > > > > > > >
