Hello.
I am having an issue with a Flink 1.8 pipeline when trying to consume
broadcast state across multiple operators.  I currently
have a working pipeline that looks like the following:

records
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessGenerator(

Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")))))
    .keyBy(new ApplicationNameKeySelector())
    .window(
        TumblingEventTimeWindows.of(

Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")))))
    .aggregate(new Aggregator())
    .connect(configurationBroadcastStream)
    .process(excluder)
    .addSink(KinesisProducer.createSinkFromStaticConfig(properties));

* records are a FlinkKafkaConsumer stream
* configurationBroadcastStream is a FlinkKafkaConsumer
* aggregator is an AggregateFunction
* filter is a BroadcastProcessFunction


I now have requirements to filter out transactions at the beginning of the
pipeline using the same broadcast stream I am consuming towards the end of
the pipeline. I updated the pipeline to look like this:

records
    .assignTimestampsAndWatermarks(
        new BoundedOutOfOrdernessGenerator(

Long.parseLong(properties.getProperty(("flinkMaxOutOfOrderness")))))
    .connect(configurationBroadcastStream) **new**
    .process(filter) **new**
    .keyBy(new ApplicationNameKeySelector())
    .window(
        TumblingEventTimeWindows.of(

Time.seconds(Long.parseLong(properties.getProperty("flinkWindow")))))
    .aggregate(new Aggregator())
    .connect(configurationBroadcastStream)
    .process(excluder)
    .addSink(KinesisProducer.createSinkFromStaticConfig(properties));

* records are a FlinkKafkaConsumer stream
* configurationBroadcastStream is a FlinkKafkaConsumer
* aggregator is an AggregateFunction
* excluder is a BroadcastProcessFunction

With this change, the aggregated records are not making it into the
excluder process.

1. The aggregator add is working. I can see this in the logs.
2. The aggregator getResult is never called. This makes me think this is a
window issue.
3. Both processBroadcastElement methods from the two broadcast functions
are working and
     retrieving the broadcasted state. I see this in logging.
4. The pipeline definitely worked prior to me adding in the second .connect
and .process at the beginning of the pipeline.
5. I have considered creating a new record object from the new
process(filter) that contains the config retrieved from the broadcast
stream along with the transactions and passing that down the pipeline but
that is really not desirable.

Any ideas on what might be going on here?

Thanks!
Roger

Reply via email to