Yes, this is a streaming pipeline.

Some more details about existing implementation v/s what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input = Pipeline.create(options);

PCollection<String> pubsubStream = input.apply("Read From Pubsub",
PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))


Reading from bigquery:

PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO
        .readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));


Merge the inputs:

PCollection<String> mergedInput =
PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input",
Flatten.pCollections());



Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))



Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first & pass the
bqStream through our BusinessLogic() before we start consuming
pubsubStream.

Is there a way to achieve this?


Thanks,

Sahil


On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com> wrote:

> Can you explain this use case some more? Is this a streaming pipeline? If
> so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <dev@beam.apache.org>
> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we start
>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>> please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs = 
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>

Reply via email to