Yes, this is a streaming pipeline. Some more details about existing implementation v/s what we want to achieve.
Current implementation: Reading from pub-sub: Pipeline input = Pipeline.create(options); PCollection<String> pubsubStream = input.apply("Read From Pubsub", PubsubIO.readMessagesWithAttributesAndMessageId() .fromSubscription(inputSubscriptionId)) Reading from bigquery: PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO .readTableRows().fromQuery(bqQuery).usingStandardSql()) .apply("JSon Transform", AsJsons.of(TableRow.class)); Merge the inputs: PCollection<String> mergedInput = PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", Flatten.pCollections()); Business Logic: mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic())) Above logic is what we use currently in our pipeline. We want to make sure that we read from BigQuery first & pass the bqStream through our BusinessLogic() before we start consuming pubsubStream. Is there a way to achieve this? Thanks, Sahil On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com> wrote: > Can you explain this use case some more? Is this a streaming pipeline? If > so, how are you reading from BigQuery? > > On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <dev@beam.apache.org> > wrote: > >> Hi, >> >> We have a requirement wherein we are consuming input from pub/sub >> (PubSubIO) as well as BQ (BQIO) >> >> We want to make sure that we consume the BQ stream first before we start >> consuming the data from pub-sub. Is there a way to achieve this? Can you >> please help with some code samples? >> >> Currently, we read data from big query using BigQueryIO into a >> PCollection & also read data from pubsub using PubsubIO. We then use the >> flatten transform in this manner. >> >> PCollection pubsubKvPairs = reads from pubsub using PubsubIO >> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO >> >> kvPairs = >> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", >> Flatten.pCollections()); >> >> >> Thanks, >> Sahil >> >>