Take a look at DoFn setup/teardown, called only once per DoFn instance and not per element so it makes easier to write initialization code.
Also if the schema map is shared, have you thought of using a single static instance of Guava's LoadingCache shared amongst all the DoFn instances? You can also refresh the data stored within the cache periodically. On Wed, Jun 14, 2017 at 10:39 PM, Kevin Peterson <[email protected]> wrote: > Still gets stuck at the same place :/ > > On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) < > [email protected]> wrote: > >> >> >> .triggering( >> >> AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1))) >> .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*)); >> >> >> >> Try the trigger above >> >> >> >> *发件人:* Kevin Peterson [mailto:[email protected]] >> *发送时间:* 2017年6月15日 2:39 >> *收件人:* [email protected] >> *主题:* Fwd: Creating side input map with global window >> >> >> >> Hi all, >> >> >> >> I am working on a (streaming) pipeline which reads elements from Pubsub, >> and schemas for those elements from a separate pubsub topic. I'd like to be >> able to create a side input map from the schema topic, and have that >> available to the main pipeline for parsing. Each message on the schema >> pubsub topic contains all schemas I care about, so for every new message, I >> want to generate a new map that will be available to the main pipeline >> (eventual consistency is fine). I don't have any windows or triggers on the >> main flow, since I really just want each element to be processed as it >> arrives, using whatever the latest schema available is. >> >> >> >> I am currently trying this with: >> >> >> >> PCollection<KV<String, String>> schema = pipeline >> .apply("Read Schema", >> PubsubIO.*readStrings*().fromTopic("topic_for_schema")) >> .apply(Window.<String>*into*(new GlobalWindows()).triggering( >> >> Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes()) >> .apply("Create Schema", ParDo.*of*(new >> SchemaDirectory.GenerateSchema())); // outputs around 100 elements for each >> input >> >> >> >> PCollectionView<Map<String, String>> schemaView = >> schema.apply(View.<String, String>*asMap*()); >> >> pipeline >> .apply("Read Elements", >> PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse >> Elements", >> >> ParDo.*of*(new DoFn<String, TableRow>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> >> String name = getNameFromElement(c.element()); >> >> >> String schema = c.sideInput(schemaView).get(name); >> >> >> c.output(parse(c, schema)); >> >> } >> }).withSideInputs(schemaView)).apply("Write to Table", >> BigQueryIO.*writeTableRows*()) // Other BQ options not copied. >> >> When running this pipeline, the View.AsMap/View.CreatePCol >> lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey >> stage never emits any elements, and so the pipeline never progresses. I >> can see the messages at the input stage, but nothing appears on the output. >> >> >> >> Any advice? >> >> >> >> Thanks, >> >> -Kevin >> >> >> > >
