Hi, Could you elaborate what is the problem that you are having? What is the exception(s) that you are getting? I have tested such simple example and it’s seems to be working as expected:
DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3); DataStreamSource<Integer> confStream = env.fromElements(42); input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new MyCoProcessFunction()).print(); Thanks, Piotrek > On 10 Jan 2018, at 10:01, anujk <kumar.a...@gmail.com> wrote: > > Currently we have an Flink pipeline running with Data-Src —> KeyBy —> > ProcessFunction. State Management (with RocksDB) and Timers are working > well. > Now we have to extend this by having another Config Stream which we want to > broadcast to all process operators. So wanted to connect the Data Stream > with Config Stream (with Config Stream being broadcast) and use > CoProcessFunction to handle both streams. > > KeyBy uses Hash based partitioning and also if we write CustomPartitioner it > can return only one partition (Array of SelectedChannel option as in > BroadcastPartitioner is not allowed). > Would have liked this to work — > dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…) > but it says both stream must be keyed. > > Is there any way to make this work? > > dataStream.connect(confStream.broadcast()).flatMap(... > RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and > processFunction functionality. > > Thanks, > Anuj > > > > -- > Sent from: > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/