Hi Beam Community,
After running into a high fan out issue and discovering that as a possible
solution was that I could introduce a combination of windowing/group by
steps between the DoFn that were being fused together, I started working on
a simple proof of concept. However when I put everything together, Im
seeing that the data is being held up in the combine.perKey step and not
letting it move forward. Here's my code:
pb.apply("windowing",
Window.<FieldValueList>into(FixedWindows.of(Duration.standardSeconds(1)))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(Duration.standardSeconds(3))
.discardingFiredPanes()
)
.apply("remapping",MapElements
.into(TypeDescriptors.kvs(TypeDescriptors.integers(),
TypeDescriptor.of(FieldValueList.class)))
.via(input ->
KV.of(input.get("STORE_NBR").getNumericValue().intValue(), input)))
.apply(Count.perKey())
.apply("print count", ParDo.of(
new DoFn<KV<Integer, Long>, Long>(){
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window){
String idStr = c.element().getKey().toString() + " - " +
c.element().getValue().toString();
Instant ts = c.timestamp();
LOG.info(String.format("%s | %s : %s", idStr, ts,
window.maxTimestamp().toDateTime().toString()));
}
}
))
This snippet of code has removed some logging I had before, but it's
important to clarify that I am seeing data move forward in the pipeline
after the "remapping" step, with the proper windows assigned to each
element.
Any thoughts of what could be happening here?
Thanks
--
Frank Pinto