My attempt to fix https://github.com/apache/beam/issues/25598: https://github.com/apache/beam/pull/30728
On Thu, Mar 21, 2024 at 10:35 AM Ondřej Pánek <ondrej.pa...@bighub.cz> wrote: > Hello Jan, > > > > thanks a lot for for the detailed answer! So during the last week, the > consumer changed their requirements on the output to BigQuery. That, as we > understand, is available even in Beam Python SDK, and we have already a PoC > for it. > > > > You have a very good point with the bullet c). That’s indeed our case now. > The data will really be only transferred from topics on the Kafka > MirrorMaker cluster (managed by the customer) sitting in the GCP to > BigQuery, rather than performing some huge Transformations. However, the > number of topics is quite large (hundreds), and the customer wants to have > additional flexibility when adding/removing topics so the transfer job will > dynamically take the changes. > > > > TBH we also started to think about PySpark Streaming in Dataproc and > created some PoC there as well. Looks more light weight than Dataflow & > Beam for the initial runs. > > > > Also, and you definitely will know better, looks like the offset > management in Beam/Dataflow for streaming is a bit of a “black box” > compared to the external storage for the offsets in Spark Streaming. The > problem I’m having with Dataflow now, is that after the job’s removal, the > internal state, hence offsets, is reset, and one needs to make sure > (somehow?) the consumed data is not duplicated/lost in case of Dataflow job > restart. > > > > Another Kafka cluster with Kafka Connect is not really an option, again > based on the customer’s requirements. The Data Engineering team wants to > have the full control on this ingestion solution, and Kafka cluster > management is not in their scope, what’s more neither is Java in general. > > > > Thanks for the answers and opinions so far! > > > > Best, > > > > Ondřej > > > > > > > > *From: *Jan Lukavský <je...@seznam.cz> > *Date: *Thursday, March 14, 2024 at 14:13 > *To: *user@beam.apache.org <user@beam.apache.org> > *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python > > Self-correction, as you are using a streaming pipeline without final > watermark emission (I suppose), option (a) will not work. Patching the sink > to support generic windowing would be probably much more involved. > > On 3/14/24 14:07, Jan Lukavský wrote: > > Hi Ondřej, > > I'll start with a disclaimer; I'm not exactly an expert on neither python > SDK nor ParquetIO, so please take these just as a suggestions from the top > of my head. > > First, it seems that the current implementation of WriteToParquet really > does not play well with streaming pipelines. There are several options that > could be used to overcome this limitation: > > a) you can try fixing the sink, maybe adding > AfterWatermark.pastEndOfWindow() trigger might be enough to make it work > (need to be tested) > > b) if the Java implementation of ParquetIO works for streaming pipelines > (and I would suppose it does), you can use cross-language transform to run > ParquetIO from python, see [1] for quick start > > c) generally speaking, using a full-blown streaming engine for tasks like > "buffer this and store it in bulk after a timeout" is inefficient. > Alternative approach would be just to use KafkaConsumer, create parquet > files on local disk, push them to GCS and commit offsets afterwards. > Streaming engines buffer data in replicated distributed state which adds > unneeded complexity > > d) if there is some non-trivial processing between consuming elements > from Kafka and writing outputs, then it might be an alternative to process > the data in streaming pipeline, write outputs back to Kafka and then use > approach (c) to get it to GCS > > The specific solution depends on the actual requirements of your customers. > > Best, > > Jan > > [1] > https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/ > > On 3/14/24 09:34, Ondřej Pánek wrote: > > Basically, this is the error we receive when trying to use avro or parquet > sinks (attached image). > > Also, check the sample pipeline that triggers this error (when deploying > with DataflowRunner). So obviously, there is no global window or default > trigger. That’s, I believe, what’s described in the issue: > https://github.com/apache/beam/issues/25598 > > > > > > *From: *Ondřej Pánek <ondrej.pa...@bighub.cz> <ondrej.pa...@bighub.cz> > *Date: *Thursday, March 14, 2024 at 07:57 > *To: *user@beam.apache.org <user@beam.apache.org> <user@beam.apache.org> > *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python > > Hello, thanks for the reply! > > Please, refer to these: > > > > - > > https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836 > - https://github.com/apache/beam/issues/25598 > > > > Best, > > > > Ondrej > > > > *From: *XQ Hu via user <user@beam.apache.org> <user@beam.apache.org> > *Date: *Thursday, March 14, 2024 at 02:32 > *To: *user@beam.apache.org <user@beam.apache.org> <user@beam.apache.org> > *Cc: *XQ Hu <x...@google.com> <x...@google.com> > *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python > > Can you explain more about " that current sinks for Avro and Parquet with > the destination of GCS are not supported"? > > > > We do have AvroIO and ParquetIO ( > https://beam.apache.org/documentation/io/connectors/) in Python. > > > > On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek <ondrej.pa...@bighub.cz> > wrote: > > Hello Beam team! > > > > We’re currently onboarding customer’s infrastructure to the Google Cloud > Platform. The decision was made that one of the technologies they will use > is Dataflow. Let me briefly the usecase specification: > > They have kafka cluster where data from CDC data source is stored. The > data in the topics is stored as Avro format. Their other requirement is > they want to have a streaming solution reading from these Kafka topics, and > writing to the Google Cloud Storage again in Avro. What’s more, the > component should be written in Python, since their Data Engineers heavily > prefer Python instead of Java. > > > > We’ve been struggling with the design of the solution for couple of weeks > now, and we’re facing quite unfortunate situation now, not really finding > any solution that would fit these requirements. > > > > So the question is: Is there any existing Dataflow template/solution with > the following specifications: > > - Streaming connector > - Written in Python > - Consumes from Kafka topics > - Reads Avro with Schema Registry > - Writes Avro to GCS > > > > We found out, that current sinks for Avro and Parquet with the destination > of GCS are not supported for Python at the moment, which is basically the > main blocker now. > > > > Any recommendations/suggestions would be really highly appreciated! > > > > Maybe the solution really does not exist and we need to create our own > custom connector for it. The question in this case would be if that’s even > possible theoretically, since we would really need to avoid another dead > end. > > > > Thanks a lot for any help! > > > > Kind regards, > > Ondrej > >