im using flink 1.81.1 api on java 11 and im trying to use a
BroadcastProcessFunction to filter a Products Datastream with a brand
autorized Datastream as broadcast.
So my first products Datastream contains different products that has a
field brand and my second brands Datastream contains only brands that
should be allowed .
The problem is that when my products comes to processElement of the
BroadcastProcessFunction , the brandState is not yet full of the brands
Datastream records, for example i have 4800 brands in my brands DataStream
but when the products goes to processElement, the brandState only contains
few of them (like 200 brands) , and this is causing problems because i have
products which will not be allowed because their brands are not uploaded
yet in the brandState
Here is my BroadcastProcessFunction
public class GateCoProcess extends BroadcastProcessFunction<CrawlData,
Brand, CrawlData> {
private final MapStateDescriptor<String, Boolean> broadcastStateDescriptor;
public GateCoProcess(MapStateDescriptor<String, Boolean>
broadcastStateDescriptor) {
this.broadcastStateDescriptor = broadcastStateDescriptor;
}
@Override
public void processElement(CrawlData value, ReadOnlyContext ctx,
Collector<CrawlData> out) throws Exception {
ReadOnlyBroadcastState<String, Boolean> brandState =
ctx.getBroadcastState(broadcastStateDescriptor);
if (brandState.contains(value.data.product.brand)) {
out.collect(value);
}
}
@Override
public void processBroadcastElement(Brand brand, Context ctx,
Collector<CrawlData> out) throws Exception {
BroadcastState<String, Boolean> brandState =
ctx.getBroadcastState(broadcastStateDescriptor);
if (brand.active) {
brandState.put(brand.getName(), true);
}
}
}
and here is my Datastreams and call of the function
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Brand> brands =
env.fromSource(KafkaSources.brandsSource,
WatermarkStrategy.noWatermarks(), "gatebrand-cdc-records");
MapStateDescriptor<String, Boolean> broadcastStateDescriptor = new
MapStateDescriptor<>(
"broadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.BOOLEAN_TYPE_INFO);
BroadcastStream<Brand> broadcastStream =
brands.broadcast(broadcastStateDescriptor);
// integration is the products Datastream
DataStream<CrawlData> integration = ExtractData.extractProducts(env);
DataStream<CrawlData> filtered =
integration.connect(broadcastStream).process(new
GateCoProcess(broadcastStateDescriptor));
env.execute("mon job de products");
What should I do to get around this problem ? thanks