Re: Backup event from Kafka to S3 in parquet format every minute

2023-02-17 Thread Alexey Romanenko
Piotr,

> On 17 Feb 2023, at 09:48, Wiśniowski Piotr 
>  wrote:
> Does this mean that Parquet IO does not support partitioning, and we need to 
> do some workarounds? Like explicitly mapping each window to a separate 
> Parquet file?
> 

Could you elaborate a bit more on this? IIRC, we used to read partitioned 
Parquet files with ParquetIO while running TPC-DS benchmark.

—
Alexey

Re: Backup event from Kafka to S3 in parquet format every minute

2023-02-17 Thread Pavel Solomin
For me this use-case worked with the following window definition, which was
a bit of try-and-fail, and I can't claim I got 100% understanding of
windowing logic.

Here's my java code for Kinesis -> Parquet files which worked:
https://github.com/psolomin/beam-playground/blob/4968d8f43082113e3e643d7fc3418a7738a67c9a/kinesis-io-with-enhanced-fan-out/src/main/java/com/psolomin/consumer/KinesisToFilePipeline.java#L56

I hope it's not hard to derive beam-python window config from it.

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin






On Fri, 17 Feb 2023 at 08:49, Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
>
> Sounds like exact problem that I have few emails before -
> https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph
>
> Does this mean that Parquet IO does not support partitioning, and we need
> to do some workarounds? Like explicitly mapping each window to a separate
> Parquet file? This could be a solution in Your case, if it works (just idea
> worth trying but did not test it and do not have enough experience with
> Beam), but I am limited only to pure SQL and not sure how I can do it.
>
> Hope This helps with Your problem and Beam support could find some
> solution to my case too.
>
> Best
>
> Wiśniowski Piotr
> On 17.02.2023 02:00, Lydian wrote:
>
> I want to make a simple Beam pipeline which will store the events from
> kafka to S3 in parquet format every minute.
>
> Here's a simplified version of my pipeline:
>
> def add_timestamp(event: Any) -> Any:
> from datetime import datetime
> from apache_beam import window
>
> return window.TimestampedValue(event,  
> datetime.timestamp(event[1].timestamp))
> # Actual Pipeline
> (
>   pipeline
>   | "Read from Kafka" >> ReadFromKafka(consumer_config, topics, 
> with_metadata=False)
>   | "Transformed" >> beam.Map(my_transform)
>   | "Add timestamp" >> beam.Map(add_timestamp)
>   | "window" >> beam.WindowInto(window.FixedWindows(60))  # 1 mins
>   | "writing to parquet" >> beam.io.WriteToParquet('s3://test-bucket/', 
> pyarrow_schema)
> )
>
> However, the pipeline failed with
>
> GroupByKey cannot be applied to an unbounded PCollection with global 
> windowing and a default trigger
>
> This seems to be coming from
> https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146
>  which
> always add a GlobalWindows and thus causing this error. Wondering what I
> should do to correctly backup the event from Kafka (Unbounded) to S3.
> Thanks!
>
> btw, I am running with portableRunner with Flink. Beam Version is 2.41.0
> (the latest version seems to have the same code) and Flink version is 1.14.5
>
>
>
> Sincerely,
> Lydian Lee
>
>


Re: Backup event from Kafka to S3 in parquet format every minute

2023-02-17 Thread Wiśniowski Piotr

Hi,

Sounds like exact problem that I have few emails before - 
https://lists.apache.org/thread/q929lbwp8ylchbn8ngypfqlbvrwpfzph


Does this mean that Parquet IO does not support partitioning, and we 
need to do some workarounds? Like explicitly mapping each window to a 
separate Parquet file? This could be a solution in Your case, if it 
works (just idea worth trying but did not test it and do not have enough 
experience with Beam), but I am limited only to pure SQL and not sure 
how I can do it.


Hope This helps with Your problem and Beam support could find some 
solution to my case too.


Best

Wiśniowski Piotr

On 17.02.2023 02:00, Lydian wrote:
I want to make a simple Beam pipeline which will store the events from 
kafka to S3 in parquet format every minute.


Here's a simplified version of my pipeline:

|def add_timestamp(event: Any) -> Any: from datetime import datetime 
from apache_beam import window return window.TimestampedValue(event, 
datetime.timestamp(event[1].timestamp)) # Actual Pipeline ( pipeline | 
"Read from Kafka" >> ReadFromKafka(consumer_config, topics, 
with_metadata=False) | "Transformed" >> beam.Map(my_transform) | "Add 
timestamp" >> beam.Map(add_timestamp) | "window" >> 
beam.WindowInto(window.FixedWindows(60)) # 1 mins | "writing to 
parquet" >> beam.io.WriteToParquet('s3://test-bucket/', pyarrow_schema) ) |


However, the pipeline failed with

|GroupByKey cannot be applied to an unbounded PCollection with global 
windowing and a default trigger |


This seems to be coming from 
https://github.com/apache/beam/blob/v2.41.0/sdks/python/apache_beam/io/iobase.py#L1145-L1146 which 
always add a |GlobalWindows| and thus causing this error. Wondering 
what I should do to correctly backup the event from Kafka (Unbounded) 
to S3. Thanks!


btw, I am running with |portableRunner| with Flink. Beam Version is 
2.41.0 (the latest version seems to have the same code) and Flink 
version is 1.14.5




Sincerely,
Lydian Lee