Still gets stuck at the same place :/ On Wed, Jun 14, 2017 at 9:45 PM, Tang Jijun(上海_中台研发部_数据平台部_基础数据部_唐觊隽) < [email protected]> wrote:
> > > .triggering( > > AfterProcessingTime.*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(1))) > .discardingFiredPanes().withAllowedLateness(Duration.*ZERO*)); > > > > Try the trigger above > > > > *发件人:* Kevin Peterson [mailto:[email protected]] > *发送时间:* 2017年6月15日 2:39 > *收件人:* [email protected] > *主题:* Fwd: Creating side input map with global window > > > > Hi all, > > > > I am working on a (streaming) pipeline which reads elements from Pubsub, > and schemas for those elements from a separate pubsub topic. I'd like to be > able to create a side input map from the schema topic, and have that > available to the main pipeline for parsing. Each message on the schema > pubsub topic contains all schemas I care about, so for every new message, I > want to generate a new map that will be available to the main pipeline > (eventual consistency is fine). I don't have any windows or triggers on the > main flow, since I really just want each element to be processed as it > arrives, using whatever the latest schema available is. > > > > I am currently trying this with: > > > > PCollection<KV<String, String>> schema = pipeline > .apply("Read Schema", > PubsubIO.*readStrings*().fromTopic("topic_for_schema")) > .apply(Window.<String>*into*(new GlobalWindows()).triggering( > > Repeatedly.*forever*(AfterPane.*elementCountAtLeast*(1))).discardingFiredPanes()) > .apply("Create Schema", ParDo.*of*(new > SchemaDirectory.GenerateSchema())); // outputs around 100 elements for each > input > > > > PCollectionView<Map<String, String>> schemaView = > schema.apply(View.<String, String>*asMap*()); > > pipeline > .apply("Read Elements", > PubsubIO*.readStrings*().fromTopic("topic_for_elements")).apply("Parse > Elements", > > ParDo.*of*(new DoFn<String, TableRow>() { > @ProcessElement > public void processElement(ProcessContext c) { > > String name = getNameFromElement(c.element()); > > > String schema = c.sideInput(schemaView).get(name); > > > c.output(parse(c, schema)); > > } > }).withSideInputs(schemaView)).apply("Write to Table", > BigQueryIO.*writeTableRows*()) // Other BQ options not copied. > > When running this pipeline, the View.AsMap/View. > CreatePCollectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey > stage never emits any elements, and so the pipeline never progresses. I > can see the messages at the input stage, but nothing appears on the output. > > > > Any advice? > > > > Thanks, > > -Kevin > > >
