Hi Zhipeng IterativeStreams does have keyBy() methods, but they all throw UnsupportedOperationException [1]
For some context: the whole thing is to do message enrichment with asyncIO, caching the enrichment info in state (with TTL). I am using an iteration as RichAsyncFunction does not support state. I didn't find a simpler way to do both async IO and caching in state. Here is a shortened version of the flow -----8<-----8<-----8<-----8<-----8<-----8<---- // Measure, SensorData and EnrichedMeasure are POJOs. The key (sensorId) is an Integer. /// flow.... DataStream<Measure> measures = env.addSource(new MeasuresSource()); IterativeStream.ConnectedIterativeStreams<Measure, SensorData> iteration = measures .iterate().withFeedbackType(TypeInformation.of(SensorData.class)); ConnectedStreams<Measure, SensorData> measureAndSensorDataBySensorId = iteration // THIS THROWS UnsupportedOperationException // "Cannot change the input partitioning of an iteration hea directly. Apply the partitioning on the input and feedback streams instead." .keyBy(Measure::getSensorId, SensorData::getSensorId); // CachedEnrichment extends KeyedCoProcessFunction<Integer, Measure, SensorData, EnrichedMeasure> // It emits cache hit on main output and cache miss on a side-output CachedEnrichment cachedEnrichment = new CachedEnrichment(); // Try enrichment from cache SingleOutputStreamOperator<EnrichedMeasure> cacheHitEnrichedMeasures = measureAndSensorDataBySensorId .process(cachedEnrichment); DataStream<Measure> cacheMissMeasures = cacheHitEnrichedMeasures .getSideOutput(cachedEnrichment.cacheMissOutputTag); // On cache miss fetch SensorData with async IO SingleOutputStreamOperator<Tuple2<EnrichedMeasure, SensorData>> enrichedMeasuresAndFetchedSensorData = AsyncDataStream.unorderedWait( cacheMissMeasures, // AsyncEnrich extends RichAsyncFunction<Measure, Tuple2<EnrichedMeasure, SensorData>> new AsyncEnrich(), ASYNC_CALL_TIMEOUT, TimeUnit.MILLISECONDS, ASYNC_OPERATOR_CAPACITY); // Close the loop with newly fetched SensorData iteration.closeWith(enrichedMeasuresAndFetchedSensorData.map(t -> t.f1)); // Merge outputs DataStream<EnrichedMeasure> allEnrichedMeasures = cacheHitEnrichedMeasures .union(enrichedMeasuresAndFetchedSensorData.map(t -> t.f0)); -----8<-----8<-----8<-----8<-----8<-----8<---- Also, the message of the UnsupportedOperationException thrown by IterativeStreams.keyBy() ("...Apply the partitioning on the input and feedback streams instead") does not look right. I tried that (I made a loop with a single stream of Either<Measure,SensorData>) but it seems there is no way of processing an IterativeStream with a KeyedProcessFunction, nor to feed back a KeyedStream into the loop. -----8<-----8<-----8<-----8<-----8<-----8<---- .... KeyedStream<Either<Measure, SensorData>, Integer> measuresOrSensorDataBySensorId = measures .map(m -> Either.<Measure, SensorData>Left(m)).returns(new TypeHint<>() {}) .keyBy(msd -> msd.isLeft() ? msd.left().getSensorId() : msd.right().getSensorId()); IterativeStream<Either<Measure, SensorData>> iteration = measuresOrSensorDataBySensorId.iterate(); CachedEnrichment cachedEnrichment = new CachedEnrichment(); // The following line DOES NOT COMPILE: IterativeStream.process() expects ProcessFunction, not KeyedProcessFunction SingleOutputStreamOperator<EnrichedMeasure> cacheHitEnrichedMeasures = iteration.<EnrichedMeasure>process(cachedEnrichment, TypeInformation.of(EnrichedMeasure.class)); .... KeyedStream<SensorData, Integer> fetchedSensorDataBySensorId = enrichedMeasureAndFetchedSensorData .map(t -> t.f1).keyBy(SensorData::getSensorId); // The following line DOES NOT COMPILE: closeWith() does not expect KeyedStream iteration.closeWith(fetchedSensorDataBySensorId); -----8<-----8<-----8<-----8<-----8<-----8<---- I will have a look at the iteration module you mentioned. I wasn't aware. Thanks Lorenzo [1] https://github.com/apache/flink/blob/release-1.15.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/IterativeStream.java#L196 On Sat, 18 Feb 2023 at 12:38, Zhipeng Zhang <zhangzhipe...@gmail.com> wrote: > Hi Lorenzo, > > Could you provide some code example to reproduce your question? As I > understand, IterativeStream#keyBy is supported since it is a subclass > of DataStream. > > Moreover, we have implemented an unified iteration module for Flink > [1] in Flink ML [2], which relies on Flink 1.15.2. Probably you can > also have a try. > > [1] > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184615300 > [2] > https://github.com/apache/flink-ml/blob/master/flink-ml-iteration/src/main/java/org/apache/flink/iteration/Iterations.java > > Lorenzo Nicora <lorenzo.nic...@gmail.com> 于2023年2月18日周六 17:00写道: > > > > Hi all, > > > > I am trying to implement an iterative streaming job that processes the > loop with a KeyedProcessFunction. > > > > I need a KeyedProcessFunction to use keyed state and to emit a > side-output (that after further transformations becomes the feedback) > > > > Problem is IterativeStream.process() only accepts ProcessFunction, no > KeyedProcessFunction. > > > > The main and feedback streams have the same payload type, and I am > keying both before starting and closing the iteration. > > I understand I cannot re-key after starting the iteration, as > IterativeStream does not support keyBy() and throws an > UnsupportedOperationException "Cannot change the input partitioning of an > iteration head directly. Apply the partitioning on the input and feedback > streams instead." > > > > Is there any way of using keyed state within an iteration? > > BTW, > > I am using Flink 1.15.2 and I am bound to that version > > > > Regards > > Lorenzo > > > > -- > best, > Zhipeng >