Hi Christian,
As I didn’t find a well-documented reference to MultipleInputStreamOperator,
let me sketch it (it’s Scala code, easy enough to be transformed to Java):
* Implements 4 inputs , the second is a broadcast stream
* ACHEP, CEIEP, V1EP, AMAP are typedef of the respective input types
//implement the operator
class YourEnrichmentOperator(params: StreamOperatorParameters[O])
//specify number of inputs (4)
extends AbstractStreamOperatorV2[O](params, 4)
with MultipleInputStreamOperator[O]
with Triggerable[K, VoidNamespace]
{
@transient var collector: TimestampedCollector[O] = null
@transient var internalTimerService: InternalTimerService[VoidNamespace] =
null
override def open(): Unit = {
super.open()
require(configuration != null, "Missing configuration")
//setup collector and timer services (modelled after other operators)
collector = new TimestampedCollector[O](output)
internalTimerService = getInternalTimerService(
"user-timers",
VoidNamespaceSerializer.INSTANCE,
this
)
}
//implement each input
val input1achep =
new AbstractInput[ACHEP, O](this, 1) {
override def processElement(element: StreamRecord[ACHEP]): Unit = {
val achep: ACHEP = element.getValue
val et = element.getTimestamp
val key = getCurrentKey.asInstanceOf[K]
collector.setAbsoluteTimestamp(et)
...
}
}
val input2ceiep =
new AbstractInput[CEIEP, O](this, 2) {
...
}
val input3v1ep =
new AbstractInput[V1EP, O](this, 3) {
...
}
val input4amap =
new AbstractInput[AMAP, O](this, 4) {
...
}
//implement getInputs(...) with all configured inputs
override def getInputs: util.List[Input[_]] =
util.List.of[Input[_]](input1achep, input2ceiep, input3v1ep, input4amap)
}
//implement the operator factory
class YourEnrichmentOperatorFactory
extends AbstractStreamOperatorFactory[O]{
override def createStreamOperator[T <: StreamOperator[O]](parameters:
StreamOperatorParameters[O]): T = {
val operator = new YourEnrichmentOperator(parameters)
operator.asInstanceOf[T]
}
override def getStreamOperatorClass(classLoader: ClassLoader): Class[_ <:
StreamOperator[_]] = {
YourEnrichmentOperator.getClass.asInstanceOf[Class[StreamOperator[_]]]
}
}
//implement job setup
...
def setupJobFromInputs(achep: DataStream[ACHEP],
ceiep: DataStream[CEIEP],
v1ep: DataStream[V1EP],
amap: DataStream[AMAP]) = {
val par = your.parallelism
val factory = new YourEnrichmentOperatorFactory
val yourEnrichmentTransformation =
new KeyedMultipleInputTransformation[O](
"yourEnrichment",
factory,
oTI,
par,
kTI
)
val achepKS = new KeySelector[ACHEP, K] {
override def getKey(value: ACHEP): K = value.yourId
}
yourEnrichmentTransformation.addInput(
achep.javaStream
.getTransformation(),
achepKS
)
yourEnrichmentTransformation.addInput(
ceiep.broadcast.javaStream.getTransformation,
null
)
val v1epKS = new KeySelector[V1EP, K] {
override def getKey(value: V1EP): K = value.cardHolderId
}
yourEnrichmentTransformation.addInput(
v1ep
.javaStream
.getTransformation(),
v1epKS
)
val amapKS = new KeySelector[AMAP, K] {
override def getKey(value: AMAP): K = value.yourId
}
yourEnrichmentTransformation.addInput(
amap
.javaStream
.getTransformation(),
amapKS
)
val yourEnrichedStream =
new DataStream[O](
new MultipleConnectedStreams(env.getJavaEnv)
.transform(yourEnrichmentTransformation)
).withScopedMetaData("yourEnriched")
yourEnrichedStream
}
...
I hope this helps.
Flink-Greetings
Thias
From: Christian Lorenz <[email protected]>
Sent: Friday, August 9, 2024 2:07 PM
To: Schwalbe Matthias <[email protected]>; Sachin Mittal
<[email protected]>; [email protected]
Subject: AW: Can we share states across tasks/operators
⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠
Hi Matthias,
I am facing a similar issue with the need to have state on 2 keyed input
streams, but the state can only be cleared with events occurring in a third
input stream.
Do you have maybe some examples how to utilize MultipleInputStreamOperator you
have mentioned?
Thanks and kind regards,
Christian
Von: Schwalbe Matthias
<[email protected]<mailto:[email protected]>>
Datum: Mittwoch, 7. August 2024 um 12:54
An: Sachin Mittal <[email protected]<mailto:[email protected]>>,
[email protected]<mailto:[email protected]>
<[email protected]<mailto:[email protected]>>
Betreff: RE: Can we share states across tasks/operators
This email has reached Mapp via an external source
Hi Sachin,
Just as an idea, while you cannot easily share state across operators, you can
do so within the same operator:
* For two such input streams you could connect() the two streams into a
ConnectedStreams and then process() by means of a KeyedCoProcessFunction
* For more than two input streams, implement some
MultipleInputStreamOperator …
* In both cases you can yield multiple independent output streams (if need
be), by means of multiple side outputs (see here e.g.
org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction.Context#output)
I do that all the time 😊
WDYT?
Sincere Flink greetings
Thias
From: Sachin Mittal <[email protected]<mailto:[email protected]>>
Sent: Wednesday, August 7, 2024 12:37 PM
To: [email protected]<mailto:[email protected]>
Subject: Can we share states across tasks/operators
Hi,
I have a stream which starts from a source and is keyed by a field f.
With the stream process function, I can emit the processed record downstream
and also update state based on the records it received for the same key.
Now I have another stream which starts from another source and is of the same
type as the first stream and it is also keyed by the same field f.
In its process function I want to access the last state updated by the first
stream's process function for the same key, do some processing (update the
state) and also send the record downstream.
Is there any way I can achieve this in Flink by connecting to the same state
store ?
Is there any concept of global state in Flink if I cannot achieve this by using
keyed states associated with an operator's process function ?
Any other way you can think of achieving the same ?
Thanks
Sachin
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.
Mapp Digital Germany GmbH with registered offices at Schönhauser Allee 148,
10435 Berlin.
Registered with the District Court München HRB 226181
Managing Directors: Frasier, Christopher & Warren, Steve
This e-mail is from Mapp Digital Group and its international legal entities and
may contain information that is confidential.
If you are not the intended recipient, do not read, copy or distribute the
e-mail or any attachments. Instead, please notify the sender and delete the
e-mail and any attachments.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng
verboten.
This message is intended only for the named recipient and may contain
confidential or privileged information. As the confidentiality of email
communication cannot be guaranteed, we do not accept any responsibility for the
confidentiality and the intactness of this message. If you have received it in
error, please advise the sender by return e-mail and delete this message and
any attachments. Any unauthorised use or dissemination of this information is
strictly prohibited.