Good morning Salva,
The situation is much better than you apparently are aware of đ
For quite some time there is an implementation for keyed operators with as many
inputs as you like:
* MultipleInputStreamOperator/KeyedMultipleInputTransformation
I originally used your proposed sum types with unions, that worked fine but was
far from elegant đ
⊠I posted a sketch of how to implement MultipleInputStreamOperator a while ago
⊠found it (around August 08 2024):
Here the essentials again:
Hi Christian,
As I didnât find a well-documented reference to MultipleInputStreamOperator,
let me sketch it (itâs Scala code, easy enough to be transformed to Java):
* Implements 4 inputs , the second is a broadcast stream
* ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types
//implement the operator
class YourEnrichmentOperator(params: StreamOperatorParameters[O])
//specify number of inputs (4)
extends AbstractStreamOperatorV2[O](params, 4)
with MultipleInputStreamOperator[O]
with Triggerable[K, VoidNamespace]
{
@transient var collector: TimestampedCollector[O] = null
@transient var internalTimerService: InternalTimerService[VoidNamespace] =
null
override def open(): Unit = {
super.open()
require(configuration != null, "Missing configuration")
//setup collector and timer services (modelled after other operators)
collector = new TimestampedCollector[O](output)
internalTimerService = getInternalTimerService(
"user-timers",
VoidNamespaceSerializer.INSTANCE,
this
)
}
//implement each input
val input1achep =
new AbstractInput[ACHEP, O](this, 1) {
override def processElement(element: StreamRecord[ACHEP]): Unit = {
val achep: ACHEP = element.getValue
val et = element.getTimestamp
val key = getCurrentKey.asInstanceOf[K]
collector.setAbsoluteTimestamp(et)
...
}
}
val input2ceiep =
new AbstractInput[CEIEP, O](this, 2) {
...
}
val input3v1ep =
new AbstractInput[V1EP, O](this, 3) {
...
}
val input4amap =
new AbstractInput[AMAP, O](this, 4) {
...
}
//implement getInputs(...) with all configured inputs
override def getInputs: util.List[Input[_]] =
util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, input4amap)
}
//implement the operator factory
class YourEnrichmentOperatorFactory
extends AbstractStreamOperatorFactory[O]{
override def createStreamOperator[T <: StreamOperator[O]](parameters:
StreamOperatorParameters[O]): T = {
val operator = new YourEnrichmentOperator(parameters)
operator.asInstanceOf[T]
}
override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <:
StreamOperator[_]] = {
YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]]
}
}
//implement job setup
...
def setupJobFromInputs(achep: DataStream[ACHEP],
ceiep: DataStream[CEIEP],
v1ep: DataStream[V1EP],
amap: DataStream[AMAP]) = {
val par = your.parallelism
val factory = new YourEnrichmentOperatorFactory
val yourEnrichmentTransformation =
new KeyedMultipleInputTransformation[O](
"yourEnrichment",
factory,
oTI,
par,
kTI
)
val achepKS = new KeySelector[ACHEP, K] {
override def getKey(value: ACHEP): K = value.yourId
}
yourEnrichmentTransformation.addInput(
achep.javaStream
.getTransformation(),
achepKS
)
yourEnrichmentTransformation.addInput(
ceiep.broadcast.javaStream.getTransformation,
null
)
val v1epKS = new KeySelector[V1EP, K] {
override def getKey(value: V1EP): K = value.cardHolderId
}
yourEnrichmentTransformation.addInput(
v1ep
.javaStream
.getTransformation(),
v1epKS
)
val amapKS = new KeySelector[AMAP, K] {
override def getKey(value: AMAP): K = value.yourId
}
yourEnrichmentTransformation.addInput(
amap
.javaStream
.getTransformation(),
amapKS
)
val yourEnrichedStream =
new DataStream[O](
new MultipleConnectedStreams(env.getJavaEnv)
.transform(yourEnrichmentTransformation)
).withScopedMetaData("yourEnriched")
yourEnrichedStream
}
...
I hope this helps.
Flink-Greetings
Thias
From: Salva AlcĂĄntara <[email protected]>
Sent: Wednesday, December 4, 2024 2:03 PM
To: user <[email protected]>
Subject: [External] Joining Streams: "One operator with N inputs" vs "N-1
co-processors"
â EXTERNAL MESSAGE â CAUTION: Think Before You Click â
I have a job which basically joins different inputs together, all partitioned
by the same key.
I originally took the typical approach and created a pipeline consisting of N-1
successive joins, each one implemented using a DataStream co-process function.
To avoid shuffling and also some state duplication across operators, I am now
considering the following alternative design:
- Collapse all the pipeline into a single (fat) operator
- This operator will process all the inputs, effectively
Since Flink does not support side inputs yet, they need to be simulated, e.g.,
by unioning all the different inputs into a sum type (a tuple or a POJO with
one field for each type of input).
Has anyone experimented with these two (somehow dual) approaches? If so, could
you provide some guidance/advice to decide which one to use?
On a related note, are there any plans to move
FLIP-17<https://cwiki.apache.org/confluence/display/FLINK/FLIP-17%3A+Side+Inputs+for+DataStream+API>
forward?
Regards,
Salva
Diese Nachricht ist ausschliesslich fĂŒr den Adressaten bestimmt und beinhaltet
unter UmstÀnden vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewĂ€hrleistet werden kann, ĂŒbernehmen wir keine
Haftung fĂŒr die GewĂ€hrung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtĂŒmlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller AnhÀnge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.