My attempt to fix https://github.com/apache/beam/issues/25598:
https://github.com/apache/beam/pull/30728

On Thu, Mar 21, 2024 at 10:35 AM Ondřej Pánek <ondrej.pa...@bighub.cz>
wrote:

> Hello Jan,
>
>
>
> thanks a lot for for the detailed answer! So during the last week, the
> consumer changed their requirements on the output to BigQuery. That, as we
> understand, is available even in Beam Python SDK, and we have already a PoC
> for it.
>
>
>
> You have a very good point with the bullet c). That’s indeed our case now.
> The data will really be only transferred from topics on the Kafka
> MirrorMaker cluster (managed by the customer) sitting in the GCP to
> BigQuery, rather than performing some huge Transformations. However, the
> number of topics is quite large (hundreds), and the customer wants to have
> additional flexibility when adding/removing topics so the transfer job will
> dynamically take the changes.
>
>
>
> TBH we also started to think about PySpark Streaming in Dataproc and
> created some PoC there as well. Looks more light weight than Dataflow &
> Beam for the initial runs.
>
>
>
> Also, and you definitely will know better, looks like the offset
> management in Beam/Dataflow for streaming is a bit of a “black box”
> compared to the external storage for the offsets in Spark Streaming. The
> problem I’m having with Dataflow now, is that after the job’s removal, the
> internal state, hence offsets, is reset, and one needs to make sure
> (somehow?) the consumed data is not duplicated/lost in case of Dataflow job
> restart.
>
>
>
> Another Kafka cluster with Kafka Connect is not really an option, again
> based on the customer’s requirements. The Data Engineering team wants to
> have the full control on this ingestion solution, and Kafka cluster
> management is not in their scope, what’s more neither is Java in general.
>
>
>
> Thanks for the answers and opinions so far!
>
>
>
> Best,
>
>
>
> Ondřej
>
>
>
>
>
>
>
> *From: *Jan Lukavský <je...@seznam.cz>
> *Date: *Thursday, March 14, 2024 at 14:13
> *To: *user@beam.apache.org <user@beam.apache.org>
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Self-correction, as you are using a streaming pipeline without final
> watermark emission (I suppose), option (a) will not work. Patching the sink
> to support generic windowing would be probably much more involved.
>
> On 3/14/24 14:07, Jan Lukavský wrote:
>
> Hi Ondřej,
>
> I'll start with a disclaimer; I'm not exactly an expert on neither python
> SDK nor ParquetIO, so please take these just as a suggestions from the top
> of my head.
>
> First, it seems that the current implementation of WriteToParquet really
> does not play well with streaming pipelines. There are several options that
> could be used to overcome this limitation:
>
>  a) you can try fixing the sink, maybe adding
> AfterWatermark.pastEndOfWindow() trigger might be enough to make it work
> (need to be tested)
>
>  b) if the Java implementation of ParquetIO works for streaming pipelines
> (and I would suppose it does), you can use cross-language transform to run
> ParquetIO from python, see [1] for quick start
>
>  c) generally speaking, using a full-blown streaming engine for tasks like
> "buffer this and store it in bulk after a timeout" is inefficient.
> Alternative approach would be just to use KafkaConsumer, create parquet
> files on local disk, push them to GCS and commit offsets afterwards.
> Streaming engines buffer data in replicated distributed state which adds
> unneeded complexity
>
>  d) if there is some non-trivial processing between consuming elements
> from Kafka and writing outputs, then it might be an alternative to process
> the data in streaming pipeline, write outputs back to Kafka and then use
> approach (c) to get it to GCS
>
> The specific solution depends on the actual requirements of your customers.
>
> Best,
>
>  Jan
>
> [1]
> https://beam.apache.org/documentation/sdks/python-multi-language-pipelines/
>
> On 3/14/24 09:34, Ondřej Pánek wrote:
>
> Basically, this is the error we receive when trying to use avro or parquet
> sinks (attached image).
>
> Also, check the sample pipeline that triggers this error (when deploying
> with DataflowRunner). So obviously, there is no global window or default
> trigger. That’s, I believe, what’s described in the issue:
> https://github.com/apache/beam/issues/25598
>
>
>
>
>
> *From: *Ondřej Pánek <ondrej.pa...@bighub.cz> <ondrej.pa...@bighub.cz>
> *Date: *Thursday, March 14, 2024 at 07:57
> *To: *user@beam.apache.org <user@beam.apache.org> <user@beam.apache.org>
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Hello, thanks for the reply!
>
> Please, refer to these:
>
>
>
>    -
>    
> https://www.googlecloudcommunity.com/gc/Data-Analytics/kafka-to-parquet/m-p/646836
>    - https://github.com/apache/beam/issues/25598
>
>
>
> Best,
>
>
>
> Ondrej
>
>
>
> *From: *XQ Hu via user <user@beam.apache.org> <user@beam.apache.org>
> *Date: *Thursday, March 14, 2024 at 02:32
> *To: *user@beam.apache.org <user@beam.apache.org> <user@beam.apache.org>
> *Cc: *XQ Hu <x...@google.com> <x...@google.com>
> *Subject: *Re: Specific use-case question - Kafka-to-GCS-avro-Python
>
> Can you explain more about " that current sinks for Avro and Parquet with
> the destination of GCS are not supported"?
>
>
>
> We do have AvroIO and ParquetIO (
> https://beam.apache.org/documentation/io/connectors/) in Python.
>
>
>
> On Wed, Mar 13, 2024 at 5:35 PM Ondřej Pánek <ondrej.pa...@bighub.cz>
> wrote:
>
> Hello Beam team!
>
>
>
> We’re currently onboarding customer’s infrastructure to the Google Cloud
> Platform. The decision was made that one of the technologies they will use
> is Dataflow. Let me briefly the usecase specification:
>
> They have kafka cluster where data from CDC data source is stored. The
> data in the topics is stored as Avro format. Their other requirement is
> they want to have a streaming solution reading from these Kafka topics, and
> writing to the Google Cloud Storage again in Avro. What’s more, the
> component should be written in Python, since their Data Engineers heavily
> prefer Python instead of Java.
>
>
>
> We’ve been struggling with the design of the solution for couple of weeks
> now, and we’re facing quite unfortunate situation now, not really finding
> any solution that would fit these requirements.
>
>
>
> So the question is: Is there any existing Dataflow template/solution with
> the following specifications:
>
>    - Streaming connector
>    - Written in Python
>    - Consumes from Kafka topics
>    - Reads Avro with Schema Registry
>    - Writes Avro to GCS
>
>
>
> We found out, that current sinks for Avro and Parquet with the destination
> of GCS are not supported for Python at the moment, which is basically the
> main blocker now.
>
>
>
> Any recommendations/suggestions would be really highly appreciated!
>
>
>
> Maybe the solution really does not exist and we need to create our own
> custom connector for it. The question in this case would be if that’s even
> possible theoretically, since we would really need to avoid another dead
> end.
>
>
>
> Thanks a lot for any help!
>
>
>
> Kind regards,
>
> Ondrej
>
>

Reply via email to