How large is this state spec stored in BQ? If the size isn't too large, you can read it from BQ and make it a side input into the DoFn.
On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <smo...@paloaltonetworks.com> wrote: > We are trying to re-initialize our state specs in the BusinessLogic() DoFn > from BQ. > BQ has data about the state spec, and we would like to make sure that the > state specs in our BusinessLogic() dofn are initialized before it starts > consuming the pub/sub. > > This is for handling the case of redeployment of the dataflow jobs so that > the states are preserved and the BusinessLogic() can work seamlessly as it > was previously. All our dofns are operating in a global window and do not > perform any aggregation. > > We are currently using Redis to preserve the state spec information but > would like to explore using BQ as an alternative to Redis. > > On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles <k...@apache.org> wrote: > >> My suggestion is to try to solve the problem in terms of what you want to >> compute. Instead of trying to control the operational aspects like "read >> all the BQ before reading Pubsub" there is presumably some reason that the >> BQ data naturally "comes first", for example if its timestamps are earlier >> or if there is a join or an aggregation that must include it. Whenever you >> think you want to set up an operational dependency between two things that >> "happen" in a pipeline, it is often best to pivot your thinking to the data >> and what you are trying to compute, and the built-in dependencies will >> solve the ordering problems. >> >> So - is there a way to describe your problem in terms of the data and >> what you are trying to compute? >> >> Kenn >> >> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <dev@beam.apache.org> >> wrote: >> >>> First PCollections are completely unordered, so there is no guarantee on >>> what order you'll see events in the flattened PCollection. >>> >>> There may be ways to process the BigQuery data in a separate transform >>> first, but it depends on the structure of the data. How large is the >>> BigQuery table? Are you doing any windowed aggregations here? >>> >>> Reuven >>> >>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak < >>> smo...@paloaltonetworks.com> wrote: >>> >>>> Yes, this is a streaming pipeline. >>>> >>>> Some more details about existing implementation v/s what we want to >>>> achieve. >>>> >>>> Current implementation: >>>> Reading from pub-sub: >>>> >>>> Pipeline input = Pipeline.create(options); >>>> >>>> PCollection<String> pubsubStream = input.apply("Read From Pubsub", >>>> PubsubIO.readMessagesWithAttributesAndMessageId() >>>> >>>> .fromSubscription(inputSubscriptionId)) >>>> >>>> >>>> Reading from bigquery: >>>> >>>> PCollection<String> bqStream = input.apply("Read from BQ", BigQueryIO >>>> .readTableRows().fromQuery(bqQuery).usingStandardSql()) >>>> >>>> .apply("JSon Transform", AsJsons.of(TableRow.class)); >>>> >>>> >>>> Merge the inputs: >>>> >>>> PCollection<String> mergedInput = >>>> PCollectionList.of(pubsubStream).and(bqStream).apply("Merge Input", >>>> Flatten.pCollections()); >>>> >>>> >>>> >>>> Business Logic: >>>> >>>> mergedInput.apply("Business Logic", ParDo.of(new BusinessLogic())) >>>> >>>> >>>> >>>> Above logic is what we use currently in our pipeline. >>>> >>>> We want to make sure that we read from BigQuery first & pass the bqStream >>>> through our BusinessLogic() before we start consuming pubsubStream. >>>> >>>> Is there a way to achieve this? >>>> >>>> >>>> Thanks, >>>> >>>> Sahil >>>> >>>> >>>> On Thu, Feb 23, 2023 at 10:21 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Can you explain this use case some more? Is this a streaming pipeline? >>>>> If so, how are you reading from BigQuery? >>>>> >>>>> On Thu, Feb 23, 2023 at 10:06 PM Sahil Modak via dev < >>>>> dev@beam.apache.org> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> We have a requirement wherein we are consuming input from pub/sub >>>>>> (PubSubIO) as well as BQ (BQIO) >>>>>> >>>>>> We want to make sure that we consume the BQ stream first before we >>>>>> start consuming the data from pub-sub. Is there a way to achieve this? >>>>>> Can >>>>>> you please help with some code samples? >>>>>> >>>>>> Currently, we read data from big query using BigQueryIO into a >>>>>> PCollection & also read data from pubsub using PubsubIO. We then use the >>>>>> flatten transform in this manner. >>>>>> >>>>>> PCollection pubsubKvPairs = reads from pubsub using PubsubIO >>>>>> PCollection bigqueryKvPairs = reads from bigquery using BigQueryIO >>>>>> >>>>>> kvPairs = >>>>>> PCollectionList.of(pubsubKvPairs).and(bigqueryKvPairs).apply("Merge >>>>>> Input", Flatten.pCollections()); >>>>>> >>>>>> >>>>>> Thanks, >>>>>> Sahil >>>>>> >>>>>>