First PCollections are completely unordered, so there is no guarantee on
what order you'll see events in the flattened PCollection.

There may be ways to process the BigQuery data in a separate transform
first, but it depends on the structure of the data. How large is the
BigQuery table? Are you doing any windowed aggregations here?


On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <>

> Yes, this is a streaming pipeline.
> Some more details about existing implementation v/s what we want to
> achieve.
> Current implementation:
> Reading from pub-sub:
> Pipeline input = Pipeline.create(options);
> PCollection<String> pubsubStream = input.apply("Read From Pubsub", 
> PubsubIO.readMessagesWithAttributesAndMessageId()
> .fromSubscription(inputSubscriptionId))
> Reading from bigquery:
> PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO
>         .readTableRows().fromQuery(bqQuery).usingStandardSql())
> .apply("JSon Transform", AsJsons.of(TableRow.class));
> Merge the inputs:
> PCollection<String> mergedInput = 
> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
> Flatten.pCollections());
> Business Logic:
> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
> Above logic is what we use currently in our pipeline.
> We want to make sure that we read from BigQuery first & pass the bqStream 
> through our BusinessLogic() before we start consuming pubsubStream.
> Is there a way to achieve this?
> Thanks,
> Sahil
> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <> wrote:
>> Can you explain this use case some more? Is this a streaming pipeline? If
>> so, how are you reading from BigQuery?
>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <>
>> wrote:
>>> Hi,
>>> We have a requirement wherein we are consuming input from pub/sub
>>> (PubSubIO) as well as BQ (BQIO)
>>> We want to make sure that we consume the BQ stream first before we start
>>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>>> please help with some code samples?
>>> Currently, we read data from big query using BigQueryIO into a
>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>> flatten transform in this manner.
>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>> kvPairs = 
>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>>> Flatten.pCollections());
>>> Thanks,
>>> Sahil

Reply via email to