Hi Zhipeng

IterativeStreams does have keyBy() methods, but they all throw
UnsupportedOperationException [1]

For some context: the whole thing is to do message enrichment with asyncIO,
caching the enrichment info in state (with TTL).
I am using an iteration as RichAsyncFunction does not support state.
I didn't find a simpler way to do both async IO and caching in state.

Here is a shortened version of the flow

-----8<-----8<-----8<-----8<-----8<-----8<----
// Measure, SensorData and EnrichedMeasure are POJOs. The key (sensorId) is
an Integer.

/// flow....
DataStream<Measure> measures = env.addSource(new MeasuresSource());
IterativeStream.ConnectedIterativeStreams<Measure, SensorData> iteration =
measures

.iterate().withFeedbackType(TypeInformation.of(SensorData.class));
ConnectedStreams<Measure, SensorData> measureAndSensorDataBySensorId =
iteration
            // THIS THROWS UnsupportedOperationException
            // "Cannot change the input partitioning of an iteration hea
directly. Apply the partitioning on the input and feedback streams instead."
            .keyBy(Measure::getSensorId, SensorData::getSensorId);

//    CachedEnrichment extends KeyedCoProcessFunction<Integer, Measure,
SensorData, EnrichedMeasure>
//    It emits cache hit on main output and cache miss on a side-output
CachedEnrichment cachedEnrichment = new CachedEnrichment();
// Try enrichment from cache
SingleOutputStreamOperator<EnrichedMeasure> cacheHitEnrichedMeasures =
measureAndSensorDataBySensorId
            .process(cachedEnrichment);
DataStream<Measure> cacheMissMeasures = cacheHitEnrichedMeasures
            .getSideOutput(cachedEnrichment.cacheMissOutputTag);

// On cache miss fetch SensorData with async IO
SingleOutputStreamOperator<Tuple2<EnrichedMeasure, SensorData>>
enrichedMeasuresAndFetchedSensorData =
            AsyncDataStream.unorderedWait(
                    cacheMissMeasures,
                    //    AsyncEnrich extends RichAsyncFunction<Measure,
Tuple2<EnrichedMeasure, SensorData>>
                    new AsyncEnrich(),
                    ASYNC_CALL_TIMEOUT, TimeUnit.MILLISECONDS,
ASYNC_OPERATOR_CAPACITY);

// Close the loop with newly fetched SensorData
iteration.closeWith(enrichedMeasuresAndFetchedSensorData.map(t -> t.f1));

// Merge outputs
DataStream<EnrichedMeasure> allEnrichedMeasures = cacheHitEnrichedMeasures
            .union(enrichedMeasuresAndFetchedSensorData.map(t -> t.f0));
-----8<-----8<-----8<-----8<-----8<-----8<----

Also, the message of the UnsupportedOperationException thrown by
IterativeStreams.keyBy()
("...Apply the partitioning on the input and feedback streams instead")
does not look right.

I tried that (I made a loop with a single stream of
Either<Measure,SensorData>) but it seems there is no way
of processing an IterativeStream with a KeyedProcessFunction, nor to feed
back a KeyedStream into the loop.

-----8<-----8<-----8<-----8<-----8<-----8<----
....
KeyedStream<Either<Measure, SensorData>, Integer>
measuresOrSensorDataBySensorId = measures
                .map(m -> Either.<Measure, SensorData>Left(m)).returns(new
TypeHint<>() {})
                .keyBy(msd -> msd.isLeft() ? msd.left().getSensorId() :
msd.right().getSensorId());
IterativeStream<Either<Measure, SensorData>> iteration =
measuresOrSensorDataBySensorId.iterate();

CachedEnrichment cachedEnrichment = new CachedEnrichment();
// The following line DOES NOT COMPILE: IterativeStream.process() expects
ProcessFunction, not KeyedProcessFunction
SingleOutputStreamOperator<EnrichedMeasure> cacheHitEnrichedMeasures =
                 iteration.<EnrichedMeasure>process(cachedEnrichment,
TypeInformation.of(EnrichedMeasure.class));
....
KeyedStream<SensorData, Integer> fetchedSensorDataBySensorId =
enrichedMeasureAndFetchedSensorData
                .map(t -> t.f1).keyBy(SensorData::getSensorId);
// The following line DOES NOT COMPILE: closeWith() does not expect
KeyedStream
iteration.closeWith(fetchedSensorDataBySensorId);
-----8<-----8<-----8<-----8<-----8<-----8<----

I will have a look at the iteration module you mentioned.
I wasn't aware.

Thanks
Lorenzo

[1]
https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java#L196


On Sat, 18 Feb 2023 at 12:38, Zhipeng Zhang <zhangzhipe...@gmail.com> wrote:

> Hi Lorenzo,
>
> Could you provide some code example to reproduce your question? As I
> understand, IterativeStream#keyBy is supported since it is a subclass
> of DataStream.
>
> Moreover, we have implemented an unified iteration module for Flink
> [1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can
> also have a try.
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300
> [2]
> https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java
>
> Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2023年2月18日周六 17:00写道:
> >
> > Hi all,
> >
> > I am trying to implement an iterative streaming job that processes the
> loop with a KeyedProcessFunction.
> >
> > I need a KeyedProcessFunction to use keyed state and to emit a
> side-output (that after further transformations becomes the feedback)
> >
> > Problem is IterativeStream.process() only accepts ProcessFunction, no
> KeyedProcessFunction.
> >
> > The main and feedback streams have the same payload type, and I am
> keying both before starting and closing the iteration.
> > I understand I cannot re-key after starting the iteration, as
> IterativeStream does not support keyBy() and throws an
> UnsupportedOperationException "Cannot change the input partitioning of an
> iteration head directly. Apply the partitioning on the input and feedback
> streams instead."
> >
> > Is there any way of using keyed state within an iteration?
> > BTW,
> > I am using Flink 1.15.2 and I am bound to that version
> >
> > Regards
> > Lorenzo
>
>
>
> --
> best,
> Zhipeng
>

Reply via email to