Hi Anuj, connecting a keyed stream and a broadcasted stream is not supported at the moment but there is work in progress [1] to add this functionality for the next release (Flink 1.5.0).
Best, Fabian [1] https://issues.apache.org/jira/browse/FLINK-3659 2018-01-10 12:21 GMT+01:00 Piotr Nowojski <pi...@data-artisans.com>: > Hi, > > Could you elaborate what is the problem that you are having? What is the > exception(s) that you are getting? I have tested such simple example and > it’s seems to be working as expected: > > DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3); > > DataStreamSource<Integer> confStream = env.fromElements(42); > > input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new > MyCoProcessFunction()).print(); > > > Thanks, Piotrek > > On 10 Jan 2018, at 10:01, anujk <kumar.a...@gmail.com> wrote: > > Currently we have an Flink pipeline running with Data-Src —> KeyBy —> > ProcessFunction. State Management (with RocksDB) and Timers are working > well. > Now we have to extend this by having another Config Stream which we want to > broadcast to all process operators. So wanted to connect the Data Stream > with Config Stream (with Config Stream being broadcast) and use > CoProcessFunction to handle both streams. > > KeyBy uses Hash based partitioning and also if we write CustomPartitioner > it > can return only one partition (Array of SelectedChannel option as in > BroadcastPartitioner is not allowed). > Would have liked this to work — > dataStream.keyBy().connect(confStream.broadcast()).process(… > RichCoProcessFunction()…) > but it says both stream must be keyed. > > Is there any way to make this work? > > dataStream.connect(confStream.broadcast()).flatMap(... > RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy > and > processFunction functionality. > > Thanks, > Anuj > > > > -- > Sent from: http://apache-flink-user-mailing-list-archive.2336050. > n4.nabble.com/ > > >