Yep, that is the basic pattern I am looking for. A couple of comments:

1. I can "poke" the side input via pubsub when I want to update the input,
so I don't need any other mechanism to force reload.
2. I don't need to reprocess elements when I get a new side input. As long
as the side input is updated eventually (within reason!), that is fine.

Any suggestions on using existing mechanisms to make this work?

On a side note, the reason I'm looking into side inputs:
I tried just having the DoFn's initialize the schema map on the first
element, but that leads to the problem of element 1 taking a while ->
Dataflow starts more threads -> those also take a while since they also
need a schema map -> more threads -> more maps created -> OOM!

On Wed, Jun 14, 2017 at 12:57 PM, Eugene Kirpichov <[email protected]>
wrote:

> Seems related to https://issues.apache.org/jira/browse/BEAM-1197?
>
> On Wed, Jun 14, 2017 at 11:39 AM Kevin Peterson <[email protected]>
> wrote:
>
>> Hi all,
>>
>> I am working on a (streaming) pipeline which reads elements from Pubsub,
>> and schemas for those elements from a separate pubsub topic. I'd like to be
>> able to create a side input map from the schema topic, and have that
>> available to the main pipeline for parsing. Each message on the schema
>> pubsub topic contains all schemas I care about, so for every new message, I
>> want to generate a new map that will be available to the main pipeline
>> (eventual consistency is fine). I don't have any windows or triggers on the
>> main flow, since I really just want each element to be processed as it
>> arrives, using whatever the latest schema available is.
>>
>> I am currently trying this with:
>>
>> PCollection<KV<String, String>> schema = pipeline
>>         .apply("Read Schema",
>>                 PubsubIO.readStrings().fromTopic("topic_for_schema"))
>>         .apply(Window.<String>into(new GlobalWindows()).triggering(
>>                 
>> Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
>>         .apply("Create Schema", ParDo.of(new 
>> SchemaDirectory.GenerateSchema()));  // outputs around 100 elements for each 
>> input
>>
>>
>> PCollectionView<Map<String, String>> schemaView =
>> schema.apply(View.<String, String>asMap());
>>
>> pipeline
>>         .apply("Read Elements", 
>> PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse 
>> Elements",
>>
>> ParDo.of(new DoFn<String, TableRow>() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>>
>> String name = getNameFromElement(c.element());
>>
>>
>>                 String schema = c.sideInput(schemaView).get(name);
>>
>>
>>                 c.output(parse(c, schema));
>>
>> }
>> }).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.
>> writeTableRows()) // Other BQ options not copied.
>>
>> When running this pipeline, the View.AsMap/View.
>> CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
>> stage never emits any elements, and so the pipeline never progresses. I
>> can see the messages at the input stage, but nothing appears on the output.
>>
>> Any advice?
>>
>> Thanks,
>> -Kevin
>>
>>
  • Fwd: Crea... Kevin Peterson
    • Re: ... Eugene Kirpichov
      • ... Kevin Peterson
    • 答复: ... 上海_中台研发部_数据平台部_基础数据部_唐觊隽
      • ... Kevin Peterson
        • ... Lukasz Cwik
          • ... Kevin Peterson
            • ... Lukasz Cwik

Reply via email to