Hi all,
I am working on a (streaming) pipeline which reads elements from Pubsub,
and schemas for those elements from a separate pubsub topic. I'd like to be
able to create a side input map from the schema topic, and have that
available to the main pipeline for parsing. Each message on the schema
pubsub topic contains all schemas I care about, so for every new message, I
want to generate a new map that will be available to the main pipeline
(eventual consistency is fine). I don't have any windows or triggers on the
main flow, since I really just want each element to be processed as it
arrives, using whatever the latest schema available is.
I am currently trying this with:
PCollection<KV<String, String>> schema = pipeline
.apply("Read Schema",
PubsubIO.readStrings().fromTopic("topic_for_schema"))
.apply(Window.<String>into(new GlobalWindows()).triggering(
Repeatedly.forever(AfterPane.elementCountAtLeast(1))).discardingFiredPanes())
.apply("Create Schema", ParDo.of(new
SchemaDirectory.GenerateSchema())); // outputs around 100 elements
for each input
PCollectionView<Map<String, String>> schemaView =
schema.apply(View.<String, String>asMap());
pipeline
.apply("Read Elements",
PubsubIO.readStrings().fromTopic("topic_for_elements")).apply("Parse
Elements",
ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
String name = getNameFromElement(c.element());
String schema = c.sideInput(schemaView).get(name);
c.output(parse(c, schema));
}
}).withSideInputs(schemaView)).apply("Write to Table", BigQueryIO.
writeTableRows()) // Other BQ options not copied.
When running this pipeline, the View.AsMap/View.CreatePCol
lectionView/Combine.globally(Concatenate)/Combine.perKey(Concatenate)/GroupByKey
stage never emits any elements, and so the pipeline never progresses. I can
see the messages at the input stage, but nothing appears on the output.
Any advice?
Thanks,
-Kevin