Re: Consuming one PCollection before consuming another with Beam

2023-02-28 Thread Sahil Modak via dev
sWithAttributesAndMessageId()
>>>>>>
>>>>>> .fromSubscription(inputSubscriptionId))
>>>>>>
>>>>>>
>>>>>> Reading from bigquery:
>>>>>>
>>>>>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>>>>>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>>>>
>>>>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>>>>
>>>>>>
>>>>>> Merge the inputs:
>>>>>>
>>>>>> PCollection mergedInput = 
>>>>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>>>>> Flatten.pCollections());
>>>>>>
>>>>>>
>>>>>>
>>>>>> Business Logic:
>>>>>>
>>>>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>>>>
>>>>>>
>>>>>>
>>>>>> Above logic is what we use currently in our pipeline.
>>>>>>
>>>>>> We want to make sure that we read from BigQuery first & pass the 
>>>>>> bqStream through our BusinessLogic() before we start consuming 
>>>>>> pubsubStream.
>>>>>>
>>>>>> Is there a way to achieve this?
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>>
>>>>>> Sahil
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>>>>>>
>>>>>>> Can you explain this use case some more? Is this a streaming
>>>>>>> pipeline? If so, how are you reading from BigQuery?
>>>>>>>
>>>>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>>>>>> dev@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> We have a requirement wherein we are consuming input from pub/sub
>>>>>>>> (PubSubIO) as well as BQ (BQIO)
>>>>>>>>
>>>>>>>> We want to make sure that we consume the BQ stream first before we
>>>>>>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>>>>>>> Can
>>>>>>>> you please help with some code samples?
>>>>>>>>
>>>>>>>> Currently, we read data from big query using BigQueryIO into a
>>>>>>>> PCollection & also read data from pubsub using PubsubIO. We then use 
>>>>>>>> the
>>>>>>>> flatten transform in this manner.
>>>>>>>>
>>>>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>>>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>>>>>>
>>>>>>>> kvPairs = 
>>>>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>>>>>>>> Input", Flatten.pCollections());
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Sahil
>>>>>>>>
>>>>>>>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-27 Thread Sahil Modak via dev
We are trying to re-initialize our state specs in the BusinessLogic() DoFn
from BQ.
BQ has data about the state spec, and we would like to make sure that the
state specs in our BusinessLogic() dofn are initialized before it starts
consuming the pub/sub.

This is for handling the case of redeployment of the dataflow jobs so that
the states are preserved and the BusinessLogic() can work seamlessly as it
was previously. All our dofns are operating in a global window and do not
perform any aggregation.

We are currently using Redis to preserve the state spec information but
would like to explore using BQ as an alternative to Redis.

On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles  wrote:

> My suggestion is to try to solve the problem in terms of what you want to
> compute. Instead of trying to control the operational aspects like "read
> all the BQ before reading Pubsub" there is presumably some reason that the
> BQ data naturally "comes first", for example if its timestamps are earlier
> or if there is a join or an aggregation that must include it. Whenever you
> think you want to set up an operational dependency between two things that
> "happen" in a pipeline, it is often best to pivot your thinking to the data
> and what you are trying to compute, and the built-in dependencies will
> solve the ordering problems.
>
> So - is there a way to describe your problem in terms of the data and what
> you are trying to compute?
>
> Kenn
>
> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev 
> wrote:
>
>> First PCollections are completely unordered, so there is no guarantee on
>> what order you'll see events in the flattened PCollection.
>>
>> There may be ways to process the BigQuery data in a separate transform
>> first, but it depends on the structure of the data. How large is the
>> BigQuery table? Are you doing any windowed aggregations here?
>>
>> Reuven
>>
>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak 
>> wrote:
>>
>>> Yes, this is a streaming pipeline.
>>>
>>> Some more details about existing implementation v/s what we want to
>>> achieve.
>>>
>>> Current implementation:
>>> Reading from pub-sub:
>>>
>>> Pipeline input = Pipeline.create(options);
>>>
>>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>
>>> .fromSubscription(inputSubscriptionId))
>>>
>>>
>>> Reading from bigquery:
>>>
>>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>>> .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>
>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>
>>>
>>> Merge the inputs:
>>>
>>> PCollection mergedInput = 
>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>> Flatten.pCollections());
>>>
>>>
>>>
>>> Business Logic:
>>>
>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>
>>>
>>>
>>> Above logic is what we use currently in our pipeline.
>>>
>>> We want to make sure that we read from BigQuery first & pass the bqStream 
>>> through our BusinessLogic() before we start consuming pubsubStream.
>>>
>>> Is there a way to achieve this?
>>>
>>>
>>> Thanks,
>>>
>>> Sahil
>>>
>>>
>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:
>>>
>>>> Can you explain this use case some more? Is this a streaming pipeline?
>>>> If so, how are you reading from BigQuery?
>>>>
>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>>> dev@beam.apache.org> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> We have a requirement wherein we are consuming input from pub/sub
>>>>> (PubSubIO) as well as BQ (BQIO)
>>>>>
>>>>> We want to make sure that we consume the BQ stream first before we
>>>>> start consuming the data from pub-sub. Is there a way to achieve this? Can
>>>>> you please help with some code samples?
>>>>>
>>>>> Currently, we read data from big query using BigQueryIO into a
>>>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>>>> flatten transform in this manner.
>>>>>
>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>>>
>>>>> kvPairs = 
>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>>>>> Input", Flatten.pCollections());
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Sahil
>>>>>
>>>>>


Re: Consuming one PCollection before consuming another with Beam

2023-02-24 Thread Sahil Modak via dev
Yes, this is a streaming pipeline.

Some more details about existing implementation v/s what we want to achieve.

Current implementation:
Reading from pub-sub:

Pipeline input = Pipeline.create(options);

PCollection pubsubStream = input.apply("Read From Pubsub",
PubsubIO.readMessagesWithAttributesAndMessageId()

.fromSubscription(inputSubscriptionId))


Reading from bigquery:

PCollection bqStream = input.apply("Read from BQ", BigQueryIO
.readTableRows().fromQuery(bqQuery).usingStandardSql())

.apply("JSon Transform", AsJsons.of(TableRow.class));


Merge the inputs:

PCollection mergedInput =
PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input",
Flatten.pCollections());



Business Logic:

mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))



Above logic is what we use currently in our pipeline.

We want to make sure that we read from BigQuery first & pass the
bqStream through our BusinessLogic() before we start consuming
pubsubStream.

Is there a way to achieve this?


Thanks,

Sahil


On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax  wrote:

> Can you explain this use case some more? Is this a streaming pipeline? If
> so, how are you reading from BigQuery?
>
> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev 
> wrote:
>
>> Hi,
>>
>> We have a requirement wherein we are consuming input from pub/sub
>> (PubSubIO) as well as BQ (BQIO)
>>
>> We want to make sure that we consume the BQ stream first before we start
>> consuming the data from pub-sub. Is there a way to achieve this? Can you
>> please help with some code samples?
>>
>> Currently, we read data from big query using BigQueryIO into a
>> PCollection & also read data from pubsub using PubsubIO. We then use the
>> flatten transform in this manner.
>>
>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>
>> kvPairs = 
>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge Input", 
>> Flatten.pCollections());
>>
>>
>> Thanks,
>> Sahil
>>
>>


Consuming one PCollection before consuming another with Beam

2023-02-23 Thread Sahil Modak via dev
Hi,

We have a requirement wherein we are consuming input from pub/sub
(PubSubIO) as well as BQ (BQIO)

We want to make sure that we consume the BQ stream first before we start
consuming the data from pub-sub. Is there a way to achieve this? Can you
please help with some code samples?

Currently, we read data from big query using BigQueryIO into a PCollection
& also read data from pubsub using PubsubIO. We then use the flatten
transform in this manner.

PCollection pubsubKvPairs = reads from pubsub using PubsubIO
PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO

kvPairs = PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge
Input", Flatten.pCollections());


Thanks,
Sahil