Hi all,
I'm trying to implement a process and I'm not quite sure what the best approach
to efficiently implement it might be while taking advantage of Beam's
parallelism and recommended patterns. Basically the problem itself can be
summarized as follows:
I have a series of incoming events which are read from Kafka into my Beam
pipeline. These events are Avro-formatted messages which contains nearly a
hundred different fields with other nested records and values about the event
(e.g. users, ip addresses, etc.). The goal of the pipeline is two fold:
- Extract any instances of various entities (e.g. users, ip addresses, etc.)
from the original object, key them (using a deterministic UUID seeded by all of
the known "key" values), and send them off to their own dedicated Kafka topic
(e.g. all extracted users -> users_topic).
- Enrich the original event using the identifiers for all of the extracted
entities (e.g. if the event came in with an array containing two users, the
expectation is that the generated keys would be present on each of those user
instances after leaving the pipeline)
My current approach has been to simply build a single transform to avoid
mutating / enriching the event throughout the pipeline for each series of
entities as such:
// Extract all entities (into a PCollectionTuple)
val taggedEvents = Pipeline
.create(options)
.apply("Read Events from Kafka",
KafkaIO.read<Event>(options.incomingEventsTopic, options))
.apply("Identify All Entity Instances", Entities.identify())
// Users
taggedEvents
.get(Entities.Tags.users)
.apply("Flatten Multiple Identified Users", Flatten.iterables())
.apply("Write Users to Kafka",
KafkaIO.write<User>(options.usersTopic, options))
// IP Addresses
taggedEvents
.get(Entities.Tags.ipAddresses)
.apply("Flatten Multiple Identified IP Addresses",
Flatten.iterables())
.apply("Write IP Addresses to Kafka",
KafkaIO.write<User>(options.ipAddressesTopic, options))
// Events (enriched)
taggedEvents
.get(Entities.Tags.events)
.apply("Write Enriched Events to Kafka",
KafkaIO.write<Event>(options.identifiedEventsTopic, options))
As mentioned, each of these individual extractions for various entities need to
add the appropriate identifiers onto the original event such that when the last
call above is made (sending events to its destination topic).
Currently my Entities.identify() transform basically does the following behind
the scenes:
class Identify() : PTransform<PCollection<KV<String, Event>>,
PCollectionTuple>() {
override fun expand(input: PCollection<KV<String, Event>>):
PCollectionTuple {
// Take an event in
return input
.apply("Extract all available entities",
ParDo
.of(ExtractAllEntities())
.withOutputTags(Tags.events, TupleTagList.of(listOf(
Entities.Tags.users,
Entities.Tags.computerEndpoints
)))
)
}
}
class ExtractAllEntities() : TraceableDoFn<KV<String, Event>, KV<String,
Event>>() {
@ProcessElement
fun processElement(context: ProcessContext) {
// Get the event (mutable)
val event = context.element().value.toMutable<Event>()
// Process the users
context.output(Entities.Tags.users, Users.extract(event, tracer))
// Process the computer endpoints
context.output(Entities.Tags.computerEndpoints,
ComputerEndpoints.extract(event, tracer))
// Tag output
context.output(Entities.Tags.events, KV.of(context.element().key,
event))
}
}
}
Where all of these extract() function calls are simply private methods, which
seems wrong. By creating a single instance of a mutable event, it allows me to
perform all of the mutations and extractions in a single pass (as opposed to
doing one, pulling out the event, passing it into another transform, repeated
ad nauseum).
So I have a few questions:
- Is there a preferred pattern for handling something like this?
- Should those private methods (e.g. ComputerEndpoints.extract(),
Users.extract(), etc.) simply be private functions that take in an event and
return an array of the requested entities? Or would this better be served as
DoFn<T> or some other types of transformations?
- In my mind, this seems like it could also be written in such a way so that
all of the entity-specific transformations could yield multiple
PCollectionTuples (each with the enriched event and a collection of entity
instances) which could be merged at some point. Would there be value to doing
that?
Sorry for the long winded question. I'm still quite new to the Beam ecosystem
and I'm just trying to follow the best practices to take advantage of things
like parallelism and some of the other benefits that Beam provides out of the
box.
Thanks all and I'll be happy to elaborate on any of the steps or provide any
additional details as needed.
Rion