It sounds like what you're doing here might be best done outside the beam
model. Instead of performing the initial computation reading from BQ into a
PCollection, perform it using the BigQuery client library in the same
manner as you currently do to load the data from redis.

On Mon, Feb 27, 2023 at 2:07 PM Sahil Modak via dev <dev@beam.apache.org>
wrote:

> We are trying to re-initialize our state specs in the BusinessLogic() DoFn
> from BQ.
> BQ has data about the state spec, and we would like to make sure that the
> state specs in our BusinessLogic() dofn are initialized before it starts
> consuming the pub/sub.
>
> This is for handling the case of redeployment of the dataflow jobs so that
> the states are preserved and the BusinessLogic() can work seamlessly as it
> was previously. All our dofns are operating in a global window and do not
> perform any aggregation.
>
> We are currently using Redis to preserve the state spec information but
> would like to explore using BQ as an alternative to Redis.
>
> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org> wrote:
>
>> My suggestion is to try to solve the problem in terms of what you want to
>> compute. Instead of trying to control the operational aspects like "read
>> all the BQ before reading Pubsub" there is presumably some reason that the
>> BQ data naturally "comes first", for example if its timestamps are earlier
>> or if there is a join or an aggregation that must include it. Whenever you
>> think you want to set up an operational dependency between two things that
>> "happen" in a pipeline, it is often best to pivot your thinking to the data
>> and what you are trying to compute, and the built-in dependencies will
>> solve the ordering problems.
>>
>> So - is there a way to describe your problem in terms of the data and
>> what you are trying to compute?
>>
>> Kenn
>>
>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <dev@beam.apache.org>
>> wrote:
>>
>>> First PCollections are completely unordered, so there is no guarantee on
>>> what order you'll see events in the flattened PCollection.
>>>
>>> There may be ways to process the BigQuery data in a separate transform
>>> first, but it depends on the structure of the data. How large is the
>>> BigQuery table? Are you doing any windowed aggregations here?
>>>
>>> Reuven
>>>
>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
>>>> Yes, this is a streaming pipeline.
>>>>
>>>> Some more details about existing implementation v/s what we want to
>>>> achieve.
>>>>
>>>> Current implementation:
>>>> Reading from pub-sub:
>>>>
>>>> Pipeline input = Pipeline.create(options);
>>>>
>>>> PCollection<String> pubsubStream = input.apply("Read From Pubsub", 
>>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>>                                                
>>>> .fromSubscription(inputSubscriptionId))
>>>>
>>>>
>>>> Reading from bigquery:
>>>>
>>>> PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO
>>>>         .readTableRows().fromQuery(bqQuery).usingStandardSql())
>>>>
>>>> .apply("JSon Transform", AsJsons.of(TableRow.class));
>>>>
>>>>
>>>> Merge the inputs:
>>>>
>>>> PCollection<String> mergedInput = 
>>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", 
>>>> Flatten.pCollections());
>>>>
>>>>
>>>>
>>>> Business Logic:
>>>>
>>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic()))
>>>>
>>>>
>>>>
>>>> Above logic is what we use currently in our pipeline.
>>>>
>>>> We want to make sure that we read from BigQuery first & pass the bqStream 
>>>> through our BusinessLogic() before we start consuming pubsubStream.
>>>>
>>>> Is there a way to achieve this?
>>>>
>>>>
>>>> Thanks,
>>>>
>>>> Sahil
>>>>
>>>>
>>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Can you explain this use case some more? Is this a streaming pipeline?
>>>>> If so, how are you reading from BigQuery?
>>>>>
>>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev <
>>>>> dev@beam.apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> We have a requirement wherein we are consuming input from pub/sub
>>>>>> (PubSubIO) as well as BQ (BQIO)
>>>>>>
>>>>>> We want to make sure that we consume the BQ stream first before we
>>>>>> start consuming the data from pub-sub. Is there a way to achieve this? 
>>>>>> Can
>>>>>> you please help with some code samples?
>>>>>>
>>>>>> Currently, we read data from big query using BigQueryIO into a
>>>>>> PCollection & also read data from pubsub using PubsubIO. We then use the
>>>>>> flatten transform in this manner.
>>>>>>
>>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO
>>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO
>>>>>>
>>>>>> kvPairs = 
>>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge 
>>>>>> Input", Flatten.pCollections());
>>>>>>
>>>>>>
>>>>>> Thanks,
>>>>>> Sahil
>>>>>>
>>>>>>

Reply via email to