[ https://issues.apache.org/jira/browse/BEAM-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Kenneth Knowles resolved BEAM-6036. ----------------------------------- Resolution: Not A Problem Fix Version/s: Not applicable > How to periodically refresh side inputs > --------------------------------------- > > Key: BEAM-6036 > URL: https://issues.apache.org/jira/browse/BEAM-6036 > Project: Beam > Issue Type: Bug > Components: beam-model > Reporter: Evgeny > Assignee: Kenneth Knowles > Priority: Blocker > Fix For: Not applicable > > > I have followed the example provided here > [https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1] > in the "Pattern: Slowly-changing lookup cache" section. I've converted the > pseudo-code from the article into this Java code: > return p > .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, > Duration.standardHours(1))) > .apply("GenerateSequenceWindow", > Window.<Long>into(new GlobalWindows()).triggering( > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > .discardingFiredPanes()) > .apply("RetrieveKVs", > ParDo.of(new RetrieveKVs())) > .apply("ToMap", View.asMap()); > RetrieveKVs() queries BigQuery table and outputs KVs. > The issue here is that the resulting map mixes up KVs from different periods > (i.e. the sequence is generated every 1 hour, the resulting map includes KVs > from 2 adjacent hours). > In an attempt to solve it I tried using View.asSingleton() instead. > return p > .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, > Duration.standardHours(1))) > .apply("GenerateSequenceWindow", > Window.<Long>into(new GlobalWindows()).triggering( > > Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane())) > .discardingFiredPanes()) > .apply("RetrieveMap", > ParDo.of(new RetrieveMap())) > .apply("ToMap", View.asSingleton()); > RetrieveMap queries data from BigQuery and outputs the complete map. The > issue with this is it not only results in flaky tests with the exception 1 > times out of 10: > Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as > a singleton view. Consider setting withDefault to provide a def > ault value > but also it doesn't seem to work. In the logs I see the RetrieveMap is called > every hour, but the pipeline using the side input gets stale data. > Is there a real working example for how to make a side input refresh > periodically? -- This message was sent by Atlassian JIRA (v7.6.3#76005)