im using flink 1.81.1 api on java 11 and im trying to use a
BroadcastProcessFunction to filter a Products Datastream with a brand
autorized Datastream as broadcast.

So my first products Datastream contains different products that has a
field brand and my second brands Datastream contains only brands that
should be allowed .

The problem is that when my products comes to processElement of the
BroadcastProcessFunction , the brandState is not yet full of the brands
Datastream records, for example i have 4800 brands in my brands DataStream
but when the products goes to processElement, the brandState only contains
few of them (like 200 brands) , and this is causing problems because i have
products which will not be allowed because their brands are not uploaded
yet in the brandState

Here is my BroadcastProcessFunction

public class GateCoProcess extends BroadcastProcessFunction<CrawlData,
Brand, CrawlData> {
    private final MapStateDescriptor<String, Boolean> broadcastStateDescriptor;


    public GateCoProcess(MapStateDescriptor<String, Boolean>
broadcastStateDescriptor) {
        this.broadcastStateDescriptor = broadcastStateDescriptor;

    }
    @Override
    public void processElement(CrawlData value, ReadOnlyContext ctx,
Collector<CrawlData> out) throws Exception {
        ReadOnlyBroadcastState<String, Boolean> brandState =
ctx.getBroadcastState(broadcastStateDescriptor);

        if (brandState.contains(value.data.product.brand)) {
            out.collect(value);
        }
    }
    @Override
    public void processBroadcastElement(Brand brand, Context ctx,
Collector<CrawlData> out) throws Exception {
        BroadcastState<String, Boolean> brandState =
ctx.getBroadcastState(broadcastStateDescriptor);
        if (brand.active) {
            brandState.put(brand.getName(), true);
        }
    }
}


and here is my Datastreams and call of the function

final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Brand> brands =
env.fromSource(KafkaSources.brandsSource,
WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");

MapStateDescriptor<String, Boolean> broadcastStateDescriptor = new
MapStateDescriptor<>(
                "broadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.BOOLEAN_TYPE_INFO);

BroadcastStream<Brand> broadcastStream =
brands.broadcast(broadcastStateDescriptor);
// integration is the products Datastream
DataStream<CrawlData> integration = ExtractData.extractProducts(env);

DataStream<CrawlData> filtered =
integration.connect(broadcastStream).process(new
GateCoProcess(broadcastStateDescriptor));

env.execute("mon job de products");

What should I do to get around this problem ? thanks

Reply via email to