[ 
https://issues.apache.org/jira/browse/BEAM-6036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kenneth Knowles resolved BEAM-6036.
-----------------------------------
       Resolution: Not A Problem
    Fix Version/s: Not applicable

> How to periodically refresh side inputs
> ---------------------------------------
>
>                 Key: BEAM-6036
>                 URL: https://issues.apache.org/jira/browse/BEAM-6036
>             Project: Beam
>          Issue Type: Bug
>          Components: beam-model
>            Reporter: Evgeny
>            Assignee: Kenneth Knowles
>            Priority: Blocker
>             Fix For: Not applicable
>
>
> I have followed the example provided here 
> [https://cloud.google.com/blog/products/gcp/guide-to-common-cloud-dataflow-use-case-patterns-part-1]
>  in the "Pattern: Slowly-changing lookup cache" section. I've converted the 
> pseudo-code from the article into this Java code:
> return p
> .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, 
> Duration.standardHours(1)))
>  .apply("GenerateSequenceWindow",
>  Window.<Long>into(new GlobalWindows()).triggering(
>  Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>  .discardingFiredPanes())
>  .apply("RetrieveKVs",
>  ParDo.of(new RetrieveKVs()))
>  .apply("ToMap", View.asMap());
> RetrieveKVs() queries BigQuery table and outputs KVs. 
> The issue here is that the resulting map mixes up KVs from different periods 
> (i.e. the sequence is generated every 1 hour, the resulting map includes KVs 
> from 2 adjacent hours).
> In an attempt to solve it I tried using View.asSingleton() instead.
>         return p
>             .apply("GenerateSequence", GenerateSequence.from(0).withRate(1, 
> Duration.standardHours(1)))
>             .apply("GenerateSequenceWindow",
>                 Window.<Long>into(new GlobalWindows()).triggering(
>                     
> Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>                     .discardingFiredPanes())
>             .apply("RetrieveMap",
>                 ParDo.of(new RetrieveMap()))
>             .apply("ToMap", View.asSingleton());
> RetrieveMap queries data from BigQuery and outputs the complete map. The 
> issue with this is it not only results in flaky tests with the exception 1 
> times out of 10:
> Caused by: java.lang.IllegalArgumentException: Empty PCollection accessed as 
> a singleton view. Consider setting withDefault to provide a def
> ault value
> but also it doesn't seem to work. In the logs I see the RetrieveMap is called 
> every hour, but the pipeline using the side input gets stale data. 
> Is there a real working example for how to make a side input refresh 
> periodically? 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to