Yes, this is a streaming pipeline.
Some more details about existing implementation v/s what we want to achieve.
Current implementation:
Reading from pub-sub:
Pipeline input = Pipeline.create(options);
PCollection<String> pubsubStream = input.apply("Read From Pubsub",
PubsubIO.readMessagesWithAttributesAndMessageId()
.fromSubscription(inputSubscriptionId))
Reading from bigquery:
PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO
.readTableRows().fromQuery(bqQuery).usingStandardSql())
.apply("JSon Transform", AsJsons.of(TableRow.class));
Merge the inputs:
PCollection<String> mergedInput =
PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input",
Flatten.pCollections());
Business Logic:
mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
Above logic is what we use currently in our pipeline.
We want to make sure that we read from BigQuery first & pass the
bqStream through our BusinessLogic() before we start consuming
pubsubStream.
Is there a way to achieve this?
Thanks,
Sahil
On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <[email protected]> wrote:
> Can you explain this use case some more? Is this a streaming pipeline? If
> so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <[email protected]>
> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we start
>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>> please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs =
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input",
>> Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>