Hi,

Could you elaborate what is the problem that you are having? What is the 
exception(s) that you are getting? I have tested such simple example and it’s 
seems to be working as expected:

DataStreamSource<Integer> input = env.fromElements(1, 2, 3, 4, 5, 1, 2, 3);

DataStreamSource<Integer> confStream = env.fromElements(42);

input.keyBy(new MyKeySelector()).connect(confStream.broadcast()).process(new 
MyCoProcessFunction()).print();

Thanks, Piotrek

> On 10 Jan 2018, at 10:01, anujk <kumar.a...@gmail.com> wrote:
> 
> Currently we have an Flink pipeline running with Data-Src —> KeyBy —>
> ProcessFunction.  State Management (with RocksDB) and Timers are working
> well.
> Now we have to extend this by having another Config Stream which we want to
> broadcast to all process operators. So wanted to connect the Data Stream
> with Config Stream (with Config Stream being broadcast) and use
> CoProcessFunction to handle both streams.
> 
> KeyBy uses Hash based partitioning and also if we write CustomPartitioner it
> can return only one partition (Array of SelectedChannel option as in
> BroadcastPartitioner is not allowed).
> Would have liked this to work —
> dataStream.keyBy().connect(confStream.broadcast()).process(…RichCoProcessFunction()…)
> but it says both stream must be keyed.
> 
> Is there any way to make this work?
> 
> dataStream.connect(confStream.broadcast()).flatMap(...
> RichCoFlatMapFunction() …) ==> broadcast works. But we really want KeyBy and
> processFunction functionality.
> 
> Thanks,
> Anuj
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

Reply via email to