Hi Ken,

This is actually a bug that a Partition should not require a UID. It is
fixed in 1.9.2 and 1.10. see FLINK-14910
<https://jira.apache.org/jira/browse/FLINK-14910>.

Thanks,
Zhu Zhu

Ken Krugler <kkrugler_li...@transpac.com> 于2020年1月10日周五 上午7:51写道:

> Hi all,
>
> [Of course, right after hitting send I realized I could just do
> rides.getTransformation().setUid(“blah”), ditto for the fares stream. Might
> be something to add to the docs, or provide a .uid() method on KeyedStreams
> for syntactic sugar]
>
> Just for grins, I disabled auto-generated UIDs for the taxi rides/fares
> state example in the online tutorial.
>
>             env.getConfig().disableAutoGeneratedUIDs();
>
> I then added UIDs for all operators, sources & sinks. But I still get the
> following when calling env.getExecutionPlan() or env.execute():
>
> java.lang.IllegalStateException: Auto generated UIDs have been disabled
> but no UID or hash has been assigned to operator Partition
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:297)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformTwoInputTransform(StreamGraphGenerator.java:682)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.transform(StreamGraphGenerator.java:252)
> at
> org.apache.flink.streaming.api.graph.StreamGraphGenerator.generate(StreamGraphGenerator.java:209)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1529)
> at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getExecutionPlan(StreamExecutionEnvironment.java:1564)
> at com.citi.flink.RidesAndFaresTool.main(RidesAndFaresTool.java:63)
>
> The simple workflow is:
>
>         DataStream<TaxiRide> rides = env
>                 .addSource(new CheckpointedTaxiRideSource(ridesFile,
> servingSpeedFactor))
>                 .uid("source: taxi rides")
>                 .name("taxi rides")
>                 .filter((TaxiRide ride) -> ride.isStart)
>                 .uid("filter: only start rides")
>                 .name("only start rides")
>                 .keyBy((TaxiRide ride) -> ride.rideId);
>
>         DataStream<TaxiFare> fares = env
>                 .addSource(new CheckpointedTaxiFareSource(faresFile,
> servingSpeedFactor))
>                 .uid("source: taxi fares")
>                 .name("taxi fares")
>                 .keyBy((TaxiFare fare) -> fare.rideId);
>
>         DataStreamSink<Tuple2<TaxiRide, TaxiFare>> enriched = rides
>                 .connect(fares)
>                 .flatMap(new EnrichmentFunction())
>                 .uid("function: enrich rides with fares")
>                 .name("enrich rides with fares")
>                 .addSink(sink)
>                 .uid("sink: enriched taxi rides")
>                 .name("enriched taxi rides");
>
> Internally the exception is thrown when the EnrichFunction (a
> RichCoFlatMapFunction) is being transformed by
> StreamGraphGenerator.transformTwoInputTransform().
>
> This calls StreamGraphGenerator.transform() with the two inputs, but the
> Transformation for each input is a PartitionTransformation.
>
> I don’t see a way to set the UID following the keyBy(), as a KeyedStream
> creates the PartitionTransformation without a UID.
>
> Any insight into setting the UID properly here? Or should
> StreamGraphGenerator.transform() skip the no-uid check for
> PartitionTransformation, since that’s not an operator with state?
>
> Thanks,
>
> — Ken
>
> --------------------------
> Ken Krugler
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>

Reply via email to