Re: Apache beam github repo collaborator

2024-07-13 Thread XQ Hu via user
Welcome to Beam! You can start contributing now.

Some useful docs:

   - https://github.com/apache/beam/blob/master/CONTRIBUTING.md
   - https://github.com/apache/beam/tree/master/contributor-docs
   - https://cwiki.apache.org/confluence/display/BEAM/Developer+Guides

You can start with some good first issues:
https://github.com/apache/beam/issues?q=is%3Aissue+is%3Aopen+label%3A%22good+first+issue%22+


On Sat, Jul 13, 2024 at 5:47 PM Prerit Chandok 
wrote:

> Hi Team,
>
> I would like to contribute to the Apache beam repository for which I would
> like to request for the Contributor Access to the repo. Thanks.
>
> Best Regards,
> Prerit Chandok
>
>


Re: [Question] Issue with MongoDB Read in Apache Beam - InvalidBSON Error

2024-07-11 Thread XQ Hu via user
You are welcome to create a PR to fix this issue if you need to change the
connector source code.

On Sun, Jul 7, 2024 at 5:39 AM Marcin Stańczak 
wrote:

> Hello Apache Beam Community,
>
> I'm Marcin and I am currently working on a project using Apache Beam
> 2.57.0. I have encountered an issue when reading data from MongoDB
> with the "mongodbio" connector. I am unable to reach the
> transformation step due to an InvalidBSON error related to
> out-of-range dates.
>
> Error Message:
>
> bson.errors.InvalidBSON: year 55054 is out of range (Consider Using
> CodecOptions(datetime_conversion=DATETIME_AUTO) or
> MongoClient(datetime_conversion='DATETIME_AUTO')). See:
>
> https://pymongo.readthedocs.io/en/stable/examples/datetimes.html#handling-out-of-range-datetimes
>
> Here are the details of my setup:
>
> Apache Beam version: 2.57.0
> Python version: 3.10
>
> In my current MongoDB collection, it is possible to encounter dates
> that are out of the standard range, such as year 0 or years greater
> than , which causes this issue.
>
> I have handled this issue in standalone Python scripts using
> CodecOptions and DatetimeConversion. However, I am facing difficulties
> integrating this logic within an Apache Beam pipeline and I don't
> think it's possible to handle without changing the source code of this
> connector. I would appreciate any guidance or suggestions on how to
> resolve this issue within the Beam framework.
>
> Thank you for your assistance.
>
> Best regards,
> Marcin
>


RE: Re: Question: Pipelines Stuck with Java 21 and BigQuery Storage Write API

2024-07-08 Thread Kensuke Tachibana
Hi,

This is Kensuke. I’m responding on behalf of Kazuha (he is my colleague).

> Which runner are you using?

We are using the DirectRunner. As you suggested, we specified
“–add-opens=java.base/java.lang=ALL-UNNAMED” in the JVM invocation command
line, and it worked in the Java 21 environment. Specifically, after setting
“export MAVEN_OPTS=‘–add-opens=java.base/java.lang=ALL-UNNAMED’”, we ran
“mvn compile exec:java” and the error was resolved.

> The same enforcement was introduced in both Java17 and 21, and it is
> strange that Java17 worked without the option but Java21 didn't.

I agree that it is strange.

> Are you testing on the same beam version and other configurations?

We have not made any changes other than what was mentioned in previous
emails.

> Try the latest beam version 2.56.0 and this option may not be needed.

We tried using Beam version 2.56.0, but now we are encountering a different
error and it doesn’t work. This error occurs with Beam 2.56.0 in both Java
17 and Java 21 environments. I have investigated the issue myself, but I
couldn’t determine the cause.

Do you have any insights based on this information?

On 2024/06/07 14:18:07 Yi Hu via user wrote:
> Hi,
>
> Which runner are you using? If you are running on Dataflow runner, then
> refer to this [1] and add
> "--jdkAddOpenModules=java.base/java.lang=ALL-UNNAMED" to pipeline option.
> If using direct runner, then add
> "--add-opens=java.base/java.lang=ALL-UNNAMED" to JVM invocation command
> line.
>
> The same enforcement was introduced in both Java17 and 21, and it is
> strange that Java17 worked without the option but Java21 didn't. Are you
> testing on the same beam version and other configurations? Also, more
> recent beam versions eliminated most usage of
> "ClassLoadingStrategy.Default.INJECTION"
> that cause this pipeline option being required, e.g. [2]. Try the latest
> beam version 2.56.0 and this option may not be needed.
>
> [1]
>
https://beam.apache.org/releases/javadoc/current/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.html#getJdkAddOpenModules--
>
> [2] https://github.com/apache/beam/pull/30367
>
>
>
> On Mon, Jun 3, 2024 at 7:14 PM XQ Hu  wrote:
>
> > Probably related to the strict encapsulation that is enforced with Java
> > 21.
> > Use `--add-opens=java.base/java.lang=ALL-UNNAMED` as the JVM flag could
be
> > a temporary workaround.
> >
> > On Mon, Jun 3, 2024 at 3:04 AM 田中万葉  wrote:
> >
> >> Hi all,
> >>
> >> I encountered an UnsupportedOperationException when using Java 21 and
the
> >> BigQuery Storage Write API in a Beam pipeline by using
> >> ".withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API));"
> >>
> >> Having read issue #28120[1] and understanding that Beam version 2.52.0
or
> >> later supports Java 21 as a runtime, I wonder why such an error
happens.
> >>
> >> I found there are two workarounds, but the Storage Write API is a more
> >> preferable way to insert data into BigQuery, so I'd like to find a
> >> solution.
> >>
> >> 1. One workaround is to switch from Java 21 to Java 17(openjdk version
> >> "17.0.10" 2024-01-16). By changing the  and
> >>  in the pom.xml file (i.e., without modifying
> >> App.java itself), the pipeline successfully writes data to my
destination
> >> table on BigQuery. It seems Java 17 and BigQuery Storage Write API
works
> >> fine.
> >> 2. The other workaround is to change insert method. I tried the
BigQuery
> >> legacy streaming API(
> >> https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery )
> >> instead of the Storage Write API. Even though I still used Java 21,
when I
> >> changed my code to
> >> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS));, I did not
> >> encounter the error.
> >>
> >> So I faced the error only when using Java 21 and BigQuery Storage Write
> >> API.
> >>
> >> I uploaded the code below to reproduce. Could you please inform me how
to
> >> handle this issue?
> >> https://github.com/cloud-ace/min-reproduce
> >>
> >> My Environment
> >> - OS
> >>   - Ubuntu 22.04
> >>   - Mac OS Sonoma(14.3.1)
> >> - beam 2.53.0, 2.54.0
> >> - openjdk version "21.0.2" 2024-01-16
> >> - maven 3.9.6
> >> - DirectRunner
> >>
> >> Thanks,
> >>
> >> Kazuha
> >>
> >> [1]: https://github.com/apache/beam/issues/28120
> >>
> >> Here is the detailed error message.
> >>
> >> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> >> java.lang.UnsupportedOperationException: Cannot define class using
> >> reflection: Unable to make protected java.lang.Package
> >> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
> >> java.base does not "opens java.lang" to unnamed module @116d5dff
> >>
> >> Caused by: java.lang.UnsupportedOperationException: Cannot define class
> >> using reflection: Unable to make protected java.lang.Package
> >> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
> >> java.base does not "opens java.lang" to unnamed module @116d5dff
> >> at
> >>

Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-02 Thread Jan Lukavský
Unfortunately, no. At least not in the case of FlnkRunner. As already 
mentioned, Beam does not currently collect information about location of 
source splits, thus this information cannot be passed to Flink.


> If there is no locality aware processing the whole thing falls into 
its face.


1 Gibps network (current networks should be actually at least 10 Gibps) 
is quite "close" to a single spinning disk throughput. On the other hand 
if target is "seconds" you might want to have a look at some SQL-based 
distributed analytical engines, Flink startup times itself will likely 
add significant overhead on top of the processing time.


On 7/2/24 16:03, Balogh, György wrote:

Hi Jan,
I need to process hundreds of GBs of data within seconds. With local 
data processing I can properly size a hw infrastructure to meet this 
(a couple of years back i did this with hadoop, worked perfectly). If 
there is no locality aware processing the whole thing falls into its 
face.


This comment suggests flink might do this under the hood?

https://stackoverflow.com/questions/38672091/flink-batch-data-local-planning-on-hdfs
Br,
Gyorgy


On Tue, Jul 2, 2024 at 3:08 PM Jan Lukavský  wrote:

Hi Gyorgy,

there is no concept of 'data locality' in Beam that would be
analogous to how MapReduce used to work. The fact that tasks
(compute) are co-located with storage on input is not transferred
to Beam Flink pipelines. The whole concept is kind of ill defined
in terms of Beam model, where tasks can be (at least in theory,
depending on a runner) moved between workers in a distributed
environment. The reason for this is that throughput (and cost) is
dominated mostly by the ability to (uniformly) scale, not the
costs associated with network transfers (this is actually most
visible in the streaming case, where the data is already 'in
motion'). The most common case in Beam is that compute is
completely separated from storage (possible even in the extreme
cases where streaming state is stored outside the compute of
streaming pipeline - but cached locally). The resulting
'stateless' nature of workers generally enables easier and more
flexible scaling.

Having said that, although Beam currently does not (AFAIK) try to
leverage local reads, it _could_ be possible by a reasonable
extension to how splittable DoFn [1] works so that it could make
use of data locality. It would be non-trivial, tough and would
definitely require support from the runner (Flink in this case).

My general suggestion would be to implement a prototype and
measure throughput and part of it possible related to networking
before attempting to dig deeper into how to implement this in Beam
Flink.

Best,

 Jan

[1] https://beam.apache.org/blog/splittable-do-fn/

On 7/2/24 10:46, Balogh, György wrote:

Hi Jan,
Separating live and historic storage makes sense. I need a
historic storage that can ensure data local processing using the
beam - flink stack.
Can I surely achieve this with HDFS? I can colocate hdfs nodes
with flink workers. What exactly enforces that flink nodes will
read local and not remote data?
Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský  wrote:

Hi Gyorgy,

comments inline.

On 7/1/24 15:10, Balogh, György wrote:

Hi Jan,
Let me add a few more details to show the full picture. We
have live datastreams (video analysis metadata) and we would
like to run both live and historic pipelines on the metadata
(eg.: live alerts, historic video searches).

This should be fine due to Beam's unified model. You can
write a PTransform that handles PCollection<...> without the
need to worry if the PCollection was created from Kafka or
some bounded source.

We planned to use kafka to store the streaming data and
directly run both types of queries on top. You are
suggesting to consider having kafka with small retention to
server the live queries and store the historic data
somewhere else which scales better for historic queries? We
need to have on prem options here. What options should we
consider that scales nicely (in terms of IO parallelization)
with beam? (eg. hdfs?)


Yes, I would not say necessarily "small" retention, but
probably "limited" retention. Running on premise you can
choose from HDFS or maybe S3 compatible minio or some other
distributed storage, depends on the scale and deployment
options (e.g. YARN or k8s).

I also happen to work on a system which targets exactly these
streaming-batch workloads (persisting upserts from stream to
batch for reprocessing), see [1]. Please feel free to contact
me directly if this sounds interesting.

Best,

 Jan

 

Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-02 Thread Balogh , György
Hi Jan,
I need to process hundreds of GBs of data within seconds. With local data
processing I can properly size a hw infrastructure to meet this (a couple
of years back i did this with hadoop, worked perfectly). If there is no
locality aware processing the whole thing falls into its face.

This comment suggests flink might do this under the hood?

https://stackoverflow.com/questions/38672091/flink-batch-data-local-planning-on-hdfs
Br,
Gyorgy


On Tue, Jul 2, 2024 at 3:08 PM Jan Lukavský  wrote:

> Hi Gyorgy,
>
> there is no concept of 'data locality' in Beam that would be analogous to
> how MapReduce used to work. The fact that tasks (compute) are co-located
> with storage on input is not transferred to Beam Flink pipelines. The whole
> concept is kind of ill defined in terms of Beam model, where tasks can be
> (at least in theory, depending on a runner) moved between workers in a
> distributed environment. The reason for this is that throughput (and cost)
> is dominated mostly by the ability to (uniformly) scale, not the costs
> associated with network transfers (this is actually most visible in the
> streaming case, where the data is already 'in motion'). The most common
> case in Beam is that compute is completely separated from storage (possible
> even in the extreme cases where streaming state is stored outside the
> compute of streaming pipeline - but cached locally). The resulting
> 'stateless' nature of workers generally enables easier and more flexible
> scaling.
>
> Having said that, although Beam currently does not (AFAIK) try to leverage
> local reads, it _could_ be possible by a reasonable extension to how
> splittable DoFn [1] works so that it could make use of data locality. It
> would be non-trivial, tough and would definitely require support from the
> runner (Flink in this case).
>
> My general suggestion would be to implement a prototype and measure
> throughput and part of it possible related to networking before attempting
> to dig deeper into how to implement this in Beam Flink.
>
> Best,
>
>  Jan
>
> [1] https://beam.apache.org/blog/splittable-do-fn/
> On 7/2/24 10:46, Balogh, György wrote:
>
> Hi Jan,
> Separating live and historic storage makes sense. I need a historic
> storage that can ensure data local processing using the beam - flink stack.
> Can I surely achieve this with HDFS? I can colocate hdfs nodes with flink
> workers. What exactly enforces that flink nodes will read local and not
> remote data?
> Thank you,
> Gyorgy
>
> On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský  wrote:
>
>> Hi Gyorgy,
>>
>> comments inline.
>> On 7/1/24 15:10, Balogh, György wrote:
>>
>> Hi Jan,
>> Let me add a few more details to show the full picture. We have live
>> datastreams (video analysis metadata) and we would like to run both live
>> and historic pipelines on the metadata (eg.: live alerts, historic video
>> searches).
>>
>> This should be fine due to Beam's unified model. You can write a
>> PTransform that handles PCollection<...> without the need to worry if the
>> PCollection was created from Kafka or some bounded source.
>>
>> We planned to use kafka to store the streaming data and directly run both
>> types of queries on top. You are suggesting to consider having kafka with
>> small retention to server the live queries and store the historic data
>> somewhere else which scales better for historic queries? We need to have on
>> prem options here. What options should we consider that scales nicely (in
>> terms of IO parallelization) with beam? (eg. hdfs?)
>>
>> Yes, I would not say necessarily "small" retention, but probably
>> "limited" retention. Running on premise you can choose from HDFS or maybe
>> S3 compatible minio or some other distributed storage, depends on the scale
>> and deployment options (e.g. YARN or k8s).
>>
>> I also happen to work on a system which targets exactly these
>> streaming-batch workloads (persisting upserts from stream to batch for
>> reprocessing), see [1]. Please feel free to contact me directly if this
>> sounds interesting.
>>
>> Best,
>>
>>  Jan
>>
>> [1] https://github.com/O2-Czech-Republic/proxima-platform
>>
>> Thank you,
>> Gyorgy
>>
>> On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:
>>
>>> H Gyorgy,
>>>
>>> I don't think it is possible to co-locate tasks as you describe it. Beam
>>> has no information about location of 'splits'. On the other hand, if batch
>>> throughput is the main concern, then reading from Kafka might not be the
>>> optimal choice. Although Kafka provides tiered storage for offloading
>>> historical data, it still somewhat limits scalability (and thus
>>> throughput), because the data have to be read by a broker and only then
>>> passed to a consumer. The parallelism is therefore limited by the number of
>>> Kafka partitions and not parallelism of the Flink job. A more scalable
>>> approach could be to persist data from Kafka to a batch storage (e.g. S3 or
>>> GCS) and reprocess it from there.
>>>
>>> Best,
>>>
>>>  Jan

Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-02 Thread Jan Lukavský

Hi Gyorgy,

there is no concept of 'data locality' in Beam that would be analogous 
to how MapReduce used to work. The fact that tasks (compute) are 
co-located with storage on input is not transferred to Beam Flink 
pipelines. The whole concept is kind of ill defined in terms of Beam 
model, where tasks can be (at least in theory, depending on a runner) 
moved between workers in a distributed environment. The reason for this 
is that throughput (and cost) is dominated mostly by the ability to 
(uniformly) scale, not the costs associated with network transfers (this 
is actually most visible in the streaming case, where the data is 
already 'in motion'). The most common case in Beam is that compute is 
completely separated from storage (possible even in the extreme cases 
where streaming state is stored outside the compute of streaming 
pipeline - but cached locally). The resulting 'stateless' nature of 
workers generally enables easier and more flexible scaling.


Having said that, although Beam currently does not (AFAIK) try to 
leverage local reads, it _could_ be possible by a reasonable extension 
to how splittable DoFn [1] works so that it could make use of data 
locality. It would be non-trivial, tough and would definitely require 
support from the runner (Flink in this case).


My general suggestion would be to implement a prototype and measure 
throughput and part of it possible related to networking before 
attempting to dig deeper into how to implement this in Beam Flink.


Best,

 Jan

[1] https://beam.apache.org/blog/splittable-do-fn/

On 7/2/24 10:46, Balogh, György wrote:

Hi Jan,
Separating live and historic storage makes sense. I need a historic 
storage that can ensure data local processing using the beam - flink 
stack.
Can I surely achieve this with HDFS? I can colocate hdfs nodes with 
flink workers. What exactly enforces that flink nodes will read local 
and not remote data?

Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský  wrote:

Hi Gyorgy,

comments inline.

On 7/1/24 15:10, Balogh, György wrote:

Hi Jan,
Let me add a few more details to show the full picture. We have
live datastreams (video analysis metadata) and we would like to
run both live and historic pipelines on the metadata (eg.: live
alerts, historic video searches).

This should be fine due to Beam's unified model. You can write a
PTransform that handles PCollection<...> without the need to worry
if the PCollection was created from Kafka or some bounded source.

We planned to use kafka to store the streaming data and
directly run both types of queries on top. You are suggesting to
consider having kafka with small retention to server the live
queries and store the historic data somewhere else which scales
better for historic queries? We need to have on prem options
here. What options should we consider that scales nicely (in
terms of IO parallelization) with beam? (eg. hdfs?)


Yes, I would not say necessarily "small" retention, but probably
"limited" retention. Running on premise you can choose from HDFS
or maybe S3 compatible minio or some other distributed storage,
depends on the scale and deployment options (e.g. YARN or k8s).

I also happen to work on a system which targets exactly these
streaming-batch workloads (persisting upserts from stream to batch
for reprocessing), see [1]. Please feel free to contact me
directly if this sounds interesting.

Best,

 Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform


Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:

H Gyorgy,

I don't think it is possible to co-locate tasks as you
describe it. Beam has no information about location of
'splits'. On the other hand, if batch throughput is the main
concern, then reading from Kafka might not be the optimal
choice. Although Kafka provides tiered storage for offloading
historical data, it still somewhat limits scalability (and
thus throughput), because the data have to be read by a
broker and only then passed to a consumer. The parallelism is
therefore limited by the number of Kafka partitions and not
parallelism of the Flink job. A more scalable approach could
be to persist data from Kafka to a batch storage (e.g. S3 or
GCS) and reprocess it from there.

Best,

 Jan

On 6/29/24 09:12, Balogh, György wrote:

Hi,
I'm planning a distributed system with multiple kafka
brokers co located with flink workers.
Data processing throughput for historic queries is a main
KPI. So I want to make sure all flink workers read local
data and not remote. I'm defining my pipelines in beam using
java.
Is it possible? What are the critical config elements to
achieve this?

Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-02 Thread Balogh , György
Hi Jan,
Separating live and historic storage makes sense. I need a historic storage
that can ensure data local processing using the beam - flink stack.
Can I surely achieve this with HDFS? I can colocate hdfs nodes with flink
workers. What exactly enforces that flink nodes will read local and not
remote data?
Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 3:42 PM Jan Lukavský  wrote:

> Hi Gyorgy,
>
> comments inline.
> On 7/1/24 15:10, Balogh, György wrote:
>
> Hi Jan,
> Let me add a few more details to show the full picture. We have live
> datastreams (video analysis metadata) and we would like to run both live
> and historic pipelines on the metadata (eg.: live alerts, historic video
> searches).
>
> This should be fine due to Beam's unified model. You can write a
> PTransform that handles PCollection<...> without the need to worry if the
> PCollection was created from Kafka or some bounded source.
>
> We planned to use kafka to store the streaming data and directly run both
> types of queries on top. You are suggesting to consider having kafka with
> small retention to server the live queries and store the historic data
> somewhere else which scales better for historic queries? We need to have on
> prem options here. What options should we consider that scales nicely (in
> terms of IO parallelization) with beam? (eg. hdfs?)
>
> Yes, I would not say necessarily "small" retention, but probably "limited"
> retention. Running on premise you can choose from HDFS or maybe S3
> compatible minio or some other distributed storage, depends on the scale
> and deployment options (e.g. YARN or k8s).
>
> I also happen to work on a system which targets exactly these
> streaming-batch workloads (persisting upserts from stream to batch for
> reprocessing), see [1]. Please feel free to contact me directly if this
> sounds interesting.
>
> Best,
>
>  Jan
>
> [1] https://github.com/O2-Czech-Republic/proxima-platform
>
> Thank you,
> Gyorgy
>
> On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:
>
>> H Gyorgy,
>>
>> I don't think it is possible to co-locate tasks as you describe it. Beam
>> has no information about location of 'splits'. On the other hand, if batch
>> throughput is the main concern, then reading from Kafka might not be the
>> optimal choice. Although Kafka provides tiered storage for offloading
>> historical data, it still somewhat limits scalability (and thus
>> throughput), because the data have to be read by a broker and only then
>> passed to a consumer. The parallelism is therefore limited by the number of
>> Kafka partitions and not parallelism of the Flink job. A more scalable
>> approach could be to persist data from Kafka to a batch storage (e.g. S3 or
>> GCS) and reprocess it from there.
>>
>> Best,
>>
>>  Jan
>> On 6/29/24 09:12, Balogh, György wrote:
>>
>> Hi,
>> I'm planning a distributed system with multiple kafka brokers co located
>> with flink workers.
>> Data processing throughput for historic queries is a main KPI. So I want
>> to make sure all flink workers read local data and not remote. I'm defining
>> my pipelines in beam using java.
>> Is it possible? What are the critical config elements to achieve this?
>> Thank you,
>> Gyorgy
>>
>> --
>>
>> György Balogh
>> CTO
>> E gyorgy.bal...@ultinous.com 
>> M +36 30 270 8342 <+36%2030%20270%208342>
>> A HU, 1117 Budapest, Budafoki út 209.
>> W www.ultinous.com
>>
>>
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>

-- 

György Balogh
CTO
E gyorgy.bal...@ultinous.com 
M +36 30 270 8342 <+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com


Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-01 Thread Jan Lukavský

Hi Gyorgy,

comments inline.

On 7/1/24 15:10, Balogh, György wrote:

Hi Jan,
Let me add a few more details to show the full picture. We have live 
datastreams (video analysis metadata) and we would like to run both 
live and historic pipelines on the metadata (eg.: live alerts, 
historic video searches).
This should be fine due to Beam's unified model. You can write a 
PTransform that handles PCollection<...> without the need to worry if 
the PCollection was created from Kafka or some bounded source.
We planned to use kafka to store the streaming data and directly run 
both types of queries on top. You are suggesting to consider 
having kafka with small retention to server the live queries and store 
the historic data somewhere else which scales better for historic 
queries? We need to have on prem options here. What options should we 
consider that scales nicely (in terms of IO parallelization) with 
beam? (eg. hdfs?)


Yes, I would not say necessarily "small" retention, but probably 
"limited" retention. Running on premise you can choose from HDFS or 
maybe S3 compatible minio or some other distributed storage, depends on 
the scale and deployment options (e.g. YARN or k8s).


I also happen to work on a system which targets exactly these 
streaming-batch workloads (persisting upserts from stream to batch for 
reprocessing), see [1]. Please feel free to contact me directly if this 
sounds interesting.


Best,

 Jan

[1] https://github.com/O2-Czech-Republic/proxima-platform


Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:

H Gyorgy,

I don't think it is possible to co-locate tasks as you describe
it. Beam has no information about location of 'splits'. On the
other hand, if batch throughput is the main concern, then reading
from Kafka might not be the optimal choice. Although Kafka
provides tiered storage for offloading historical data, it still
somewhat limits scalability (and thus throughput), because the
data have to be read by a broker and only then passed to a
consumer. The parallelism is therefore limited by the number of
Kafka partitions and not parallelism of the Flink job. A more
scalable approach could be to persist data from Kafka to a batch
storage (e.g. S3 or GCS) and reprocess it from there.

Best,

 Jan

On 6/29/24 09:12, Balogh, György wrote:

Hi,
I'm planning a distributed system with multiple kafka brokers co
located with flink workers.
Data processing throughput for historic queries is a main KPI. So
I want to make sure all flink workers read local data and not
remote. I'm defining my pipelines in beam using java.
Is it possible? What are the critical config elements to achieve
this?
Thank you,
Gyorgy

-- 


György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 




--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 


Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-01 Thread Balogh , György
Hi Jan,
Let me add a few more details to show the full picture. We have live
datastreams (video analysis metadata) and we would like to run both live
and historic pipelines on the metadata (eg.: live alerts, historic video
searches). We planned to use kafka to store the streaming data and
directly run both types of queries on top. You are suggesting to consider
having kafka with small retention to server the live queries and store the
historic data somewhere else which scales better for historic queries? We
need to have on prem options here. What options should we consider that
scales nicely (in terms of IO parallelization) with beam? (eg. hdfs?)
Thank you,
Gyorgy

On Mon, Jul 1, 2024 at 9:21 AM Jan Lukavský  wrote:

> H Gyorgy,
>
> I don't think it is possible to co-locate tasks as you describe it. Beam
> has no information about location of 'splits'. On the other hand, if batch
> throughput is the main concern, then reading from Kafka might not be the
> optimal choice. Although Kafka provides tiered storage for offloading
> historical data, it still somewhat limits scalability (and thus
> throughput), because the data have to be read by a broker and only then
> passed to a consumer. The parallelism is therefore limited by the number of
> Kafka partitions and not parallelism of the Flink job. A more scalable
> approach could be to persist data from Kafka to a batch storage (e.g. S3 or
> GCS) and reprocess it from there.
>
> Best,
>
>  Jan
> On 6/29/24 09:12, Balogh, György wrote:
>
> Hi,
> I'm planning a distributed system with multiple kafka brokers co located
> with flink workers.
> Data processing throughput for historic queries is a main KPI. So I want
> to make sure all flink workers read local data and not remote. I'm defining
> my pipelines in beam using java.
> Is it possible? What are the critical config elements to achieve this?
> Thank you,
> Gyorgy
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>
>

-- 

György Balogh
CTO
E gyorgy.bal...@ultinous.com 
M +36 30 270 8342 <+36%2030%20270%208342>
A HU, 1117 Budapest, Budafoki út 209.
W www.ultinous.com


Re: beam using flink runner to achive data locality in a distributed setup?

2024-07-01 Thread Jan Lukavský

H Gyorgy,

I don't think it is possible to co-locate tasks as you describe it. Beam 
has no information about location of 'splits'. On the other hand, if 
batch throughput is the main concern, then reading from Kafka might not 
be the optimal choice. Although Kafka provides tiered storage for 
offloading historical data, it still somewhat limits scalability (and 
thus throughput), because the data have to be read by a broker and only 
then passed to a consumer. The parallelism is therefore limited by the 
number of Kafka partitions and not parallelism of the Flink job. A more 
scalable approach could be to persist data from Kafka to a batch storage 
(e.g. S3 or GCS) and reprocess it from there.


Best,

 Jan

On 6/29/24 09:12, Balogh, György wrote:

Hi,
I'm planning a distributed system with multiple kafka brokers co 
located with flink workers.
Data processing throughput for historic queries is a main KPI. So I 
want to make sure all flink workers read local data and not remote. 
I'm defining my pipelines in beam using java.

Is it possible? What are the critical config elements to achieve this?
Thank you,
Gyorgy

--

György Balogh
CTO
E   gyorgy.bal...@ultinous.com 
M   +36 30 270 8342 
A   HU, 1117 Budapest, Budafoki út 209.
W   www.ultinous.com 


Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Jan Lukavský
I don't use Kafka transactions, so I could only speculate. Seems that 
the transaction times out before being committed. Looking into the code, 
this could happen if there is *huge* amount of work between checkpoints 
(i.e. checkpoints do not happen often enough). I'll suggest 
investigating the logs looking for logs coming from the 
KafkaExactlyOnceSink.


 Jan

[1] 
https://github.com/apache/beam/blob/a944bf87cd03d32105d87fc986ecba5b656683bc/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaExactlyOnceSink.java#L245


On 6/24/24 16:35, Ruben Vargas wrote:

On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský  wrote:

Hi,

the distribution of keys to workers might not be uniform, when the
number of keys is comparable to total parallelism. General advise would be:

   a) try to increase number of keys (EOS parallelism in this case) to be
at least several times higher than parallelism

Make sense, unfortunately I faced an error when I tried to put the
shards > partitions. :(

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat

Do I need to move any configuration to do that?

Thanks


   b) increase maxParallelism (default 128, maximum 32768), as it might
influence the assignment of keys to downstream workers

Best,

   Jan

On 6/21/24 05:25, Ruben Vargas wrote:

Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  wrote:

Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:

Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:

Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it
explains it quite well. Generally, I would prefer unaligned checkpoints
in this case.

Another thing to consider is the number of shards of the EOS sink.
Because how the shards are distributed among workers, it might be good
idea to actually increase that to some number higher than number of
target partitions (e.g. targetPartitions * 10 or so). Additional thing
to consider is increasing maxParallelism of the pipeline (e.g. max value
is 32768), as it also affects how 'evenly' Flink assigns shards to
workers. You can check if the assignment is even using counters in the
sink operator(s).

Jan

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

On 6/19/24 05:15, Ruben Vargas wrote:

Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

- Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben 

Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Ruben Vargas
On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský  wrote:
>
> Hi,
>
> the distribution of keys to workers might not be uniform, when the
> number of keys is comparable to total parallelism. General advise would be:
>
>   a) try to increase number of keys (EOS parallelism in this case) to be
> at least several times higher than parallelism

Make sense, unfortunately I faced an error when I tried to put the
shards > partitions. :(

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat

Do I need to move any configuration to do that?

Thanks

>
>   b) increase maxParallelism (default 128, maximum 32768), as it might
> influence the assignment of keys to downstream workers
>
> Best,
>
>   Jan
>
> On 6/21/24 05:25, Ruben Vargas wrote:
> > Image as not correctly attached. sending it again. Sorry
> >
> > Thanks
> >
> > On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  
> > wrote:
> >> Hello guys, me again
> >>
> >> I was trying to debug the issue with the  backpressure and I noticed
> >> that even if I set the shards = 16, not all tasks are receiving
> >> messages (attaching screenshot). You know potential causes and
> >> solutions?
> >>
> >> I really appreciate any help you can provide
> >>
> >>
> >> Thank you very much!
> >>
> >> Regards.
> >>
> >>
> >> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  
> >> wrote:
> >>> Hello again
> >>>
> >>> Thank you for all the suggestions.
> >>>
> >>> Unfortunately if I put more shards than partitions it throws me this 
> >>> exception
> >>>
> >>> "message": 
> >>> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> >>> ids -> ToGBKResult ->
> >>> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> >>> to Kafka topic 
> >>> 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> >>> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> >>> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> >>> java.lang.RuntimeException:
> >>> java.lang.reflect.InvocationTargetException\n\tat
> >>> ..
> >>> ..
> >>> ..
> >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> >>> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> >>> 6ms while awaiting AddOffsetsToTxn\n",
> >>>
> >>>
> >>> Any other alternative? Thank you very much!
> >>>
> >>> Regards
> >>>
> >>> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
>  Hi,
> 
>  regarding aligned vs unaligned checkpoints I recommend reading [1], it
>  explains it quite well. Generally, I would prefer unaligned checkpoints
>  in this case.
> 
>  Another thing to consider is the number of shards of the EOS sink.
>  Because how the shards are distributed among workers, it might be good
>  idea to actually increase that to some number higher than number of
>  target partitions (e.g. targetPartitions * 10 or so). Additional thing
>  to consider is increasing maxParallelism of the pipeline (e.g. max value
>  is 32768), as it also affects how 'evenly' Flink assigns shards to
>  workers. You can check if the assignment is even using counters in the
>  sink operator(s).
> 
> Jan
> 
>  [1]
>  https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> 
>  On 6/19/24 05:15, Ruben Vargas wrote:
> > Hello guys
> >
> > Now I was able to pass that error.
> >
> > I had to set the consumer factory function
> > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> >
> > This is because my cluster uses SASL authentication mechanism, and the
> > small consumer created to fetch the topics metadata was throwing that
> > error.
> >
> > There are other couple things I noticed:
> >
> >- Now I have a lot of backpressure, I assigned x3 resources to the
> > cluster and even with that the back pressure is high . Any advice on
> > this? I already increased the shards to equal the number of partitions
> > of the destination topic.
> >
> > - I have an error where
> > "State exists for shard mytopic-0, but there is no state stored with
> > Kafka topic mytopic' group id myconsumergroup'
> >
> > The only way I found to recover from this error is to change the group
> > name. Any other advice on how to recover from this error?
> >
> >
> > Thank you very 

Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Jan Lukavský

Hi,

the distribution of keys to workers might not be uniform, when the 
number of keys is comparable to total parallelism. General advise would be:


 a) try to increase number of keys (EOS parallelism in this case) to be 
at least several times higher than parallelism


 b) increase maxParallelism (default 128, maximum 32768), as it might 
influence the assignment of keys to downstream workers


Best,

 Jan

On 6/21/24 05:25, Ruben Vargas wrote:

Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  wrote:

Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:

Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:

Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it
explains it quite well. Generally, I would prefer unaligned checkpoints
in this case.

Another thing to consider is the number of shards of the EOS sink.
Because how the shards are distributed among workers, it might be good
idea to actually increase that to some number higher than number of
target partitions (e.g. targetPartitions * 10 or so). Additional thing
to consider is increasing maxParallelism of the pipeline (e.g. max value
is 32768), as it also affects how 'evenly' Flink assigns shards to
workers. You can check if the assignment is even using counters in the
sink operator(s).

   Jan

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/

On 6/19/24 05:15, Ruben Vargas wrote:

Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

   - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  wrote:

Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:

I'd suggest:
   a) use unaligned 

Re: Exactly once KafkaIO with flink runner

2024-06-20 Thread Ruben Vargas
Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  wrote:
>
> Hello guys, me again
>
> I was trying to debug the issue with the  backpressure and I noticed
> that even if I set the shards = 16, not all tasks are receiving
> messages (attaching screenshot). You know potential causes and
> solutions?
>
> I really appreciate any help you can provide
>
>
> Thank you very much!
>
> Regards.
>
>
> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:
> >
> > Hello again
> >
> > Thank you for all the suggestions.
> >
> > Unfortunately if I put more shards than partitions it throws me this 
> > exception
> >
> > "message": 
> > "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> > ids -> ToGBKResult ->
> > PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> > to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> > (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> > FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> > java.lang.RuntimeException:
> > java.lang.reflect.InvocationTargetException\n\tat
> > ..
> > ..
> > ..
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> > java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> > org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> > 6ms while awaiting AddOffsetsToTxn\n",
> >
> >
> > Any other alternative? Thank you very much!
> >
> > Regards
> >
> > On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
> > >
> > > Hi,
> > >
> > > regarding aligned vs unaligned checkpoints I recommend reading [1], it
> > > explains it quite well. Generally, I would prefer unaligned checkpoints
> > > in this case.
> > >
> > > Another thing to consider is the number of shards of the EOS sink.
> > > Because how the shards are distributed among workers, it might be good
> > > idea to actually increase that to some number higher than number of
> > > target partitions (e.g. targetPartitions * 10 or so). Additional thing
> > > to consider is increasing maxParallelism of the pipeline (e.g. max value
> > > is 32768), as it also affects how 'evenly' Flink assigns shards to
> > > workers. You can check if the assignment is even using counters in the
> > > sink operator(s).
> > >
> > >   Jan
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> > >
> > > On 6/19/24 05:15, Ruben Vargas wrote:
> > > > Hello guys
> > > >
> > > > Now I was able to pass that error.
> > > >
> > > > I had to set the consumer factory function
> > > > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> > > >
> > > > This is because my cluster uses SASL authentication mechanism, and the
> > > > small consumer created to fetch the topics metadata was throwing that
> > > > error.
> > > >
> > > > There are other couple things I noticed:
> > > >
> > > >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > > > cluster and even with that the back pressure is high . Any advice on
> > > > this? I already increased the shards to equal the number of partitions
> > > > of the destination topic.
> > > >
> > > > - I have an error where
> > > > "State exists for shard mytopic-0, but there is no state stored with
> > > > Kafka topic mytopic' group id myconsumergroup'
> > > >
> > > > The only way I found to recover from this error is to change the group
> > > > name. Any other advice on how to recover from this error?
> > > >
> > > >
> > > > Thank you very much for following this up!
> > > >
> > > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  
> > > > wrote:
> > > >> Hello Jan
> > > >>
> > > >> Thanks for the suggestions
> > > >>
> > > >> Any benefit of using aligned vs unaligned?
> > > >>
> > > >>
> > > >> At the end I found one problem that was preventing  flink from doing
> > > >> the checkpointing. It was a DoFn function that has some "non
> > > >> serializable" objects, so I made those transient and initialized those
> > > >> on the setup.
> > > >>
> > > >> Weird, because I usually was able to detect these kinds of errors just
> > > >> running in the direct runner, or even in flink before enabling EOS.
> > > >>
> > > >>
> > > >> Now I'm facing another weird issue
> > > >>
> > > >> org.apache.beam.sdk.util.UserCodeException:
> > > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> > > >> expired before the last committed offset for partitions
> > > >> [behavioral-signals-6] could be determined. Try tuning
> > > >> default.api.timeout.ms larger to relax the threshold.
> > > >> at 
> > > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> > > >> at 
> > > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> > > >> Source)
> > > >> at 
> > > >> 

Re: Exactly once KafkaIO with flink runner

2024-06-20 Thread Ruben Vargas
Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:
>
> Hello again
>
> Thank you for all the suggestions.
>
> Unfortunately if I put more shards than partitions it throws me this exception
>
> "message": 
> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> ids -> ToGBKResult ->
> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException\n\tat
> ..
> ..
> ..
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> 6ms while awaiting AddOffsetsToTxn\n",
>
>
> Any other alternative? Thank you very much!
>
> Regards
>
> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
> >
> > Hi,
> >
> > regarding aligned vs unaligned checkpoints I recommend reading [1], it
> > explains it quite well. Generally, I would prefer unaligned checkpoints
> > in this case.
> >
> > Another thing to consider is the number of shards of the EOS sink.
> > Because how the shards are distributed among workers, it might be good
> > idea to actually increase that to some number higher than number of
> > target partitions (e.g. targetPartitions * 10 or so). Additional thing
> > to consider is increasing maxParallelism of the pipeline (e.g. max value
> > is 32768), as it also affects how 'evenly' Flink assigns shards to
> > workers. You can check if the assignment is even using counters in the
> > sink operator(s).
> >
> >   Jan
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> >
> > On 6/19/24 05:15, Ruben Vargas wrote:
> > > Hello guys
> > >
> > > Now I was able to pass that error.
> > >
> > > I had to set the consumer factory function
> > > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> > >
> > > This is because my cluster uses SASL authentication mechanism, and the
> > > small consumer created to fetch the topics metadata was throwing that
> > > error.
> > >
> > > There are other couple things I noticed:
> > >
> > >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > > cluster and even with that the back pressure is high . Any advice on
> > > this? I already increased the shards to equal the number of partitions
> > > of the destination topic.
> > >
> > > - I have an error where
> > > "State exists for shard mytopic-0, but there is no state stored with
> > > Kafka topic mytopic' group id myconsumergroup'
> > >
> > > The only way I found to recover from this error is to change the group
> > > name. Any other advice on how to recover from this error?
> > >
> > >
> > > Thank you very much for following this up!
> > >
> > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  
> > > wrote:
> > >> Hello Jan
> > >>
> > >> Thanks for the suggestions
> > >>
> > >> Any benefit of using aligned vs unaligned?
> > >>
> > >>
> > >> At the end I found one problem that was preventing  flink from doing
> > >> the checkpointing. It was a DoFn function that has some "non
> > >> serializable" objects, so I made those transient and initialized those
> > >> on the setup.
> > >>
> > >> Weird, because I usually was able to detect these kinds of errors just
> > >> running in the direct runner, or even in flink before enabling EOS.
> > >>
> > >>
> > >> Now I'm facing another weird issue
> > >>
> > >> org.apache.beam.sdk.util.UserCodeException:
> > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> > >> expired before the last committed offset for partitions
> > >> [behavioral-signals-6] could be determined. Try tuning
> > >> default.api.timeout.ms larger to relax the threshold.
> > >> at 
> > >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> > >> at 
> > >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> > >> Source)
> > >> at 
> > >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> > >>
> > >> I tried to extend the timeout and it didn't work, my shards are equal
> > >> to my number of partitions.
> > >>
> > >> I appreciate any kind of guidance
> > >>
> > >> Thanks.
> > >>
> > >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:
> > >>> I'd suggest:
> > >>>   a) use unaligned 

Re: Paralalelism of a side input

2024-06-20 Thread Ruben Vargas
Only bad thing for this approach is, at least in the flink runner it
consume a task slot :(

El El mié, 12 de jun de 2024 a la(s) 9:38 a.m., Robert Bradshaw <
rober...@google.com> escribió:

> On Wed, Jun 12, 2024 at 7:56 AM Ruben Vargas 
> wrote:
> >
> > The approach looks good. but one question
> >
> > My understanding is that this will schedule for example 8 operators
> across the workers, but only one of them will be processing, the others
> remain idle? Are those consuming resources in some way? I'm assuming may be
> is not significant.
>
> That is correct, but the resources consumed by an idle operator should
> be negligible.
>
> > Thanks.
> >
> > El El vie, 7 de jun de 2024 a la(s) 3:56 p.m., Robert Bradshaw via user <
> user@beam.apache.org> escribió:
> >>
> >> You can always limit the parallelism by assigning a single key to
> >> every element and then doing a grouping or reshuffle[1] on that key
> >> before processing the elements. Even if the operator parallelism for
> >> that step is technically, say, eight, your effective parallelism will
> >> be exactly one.
> >>
> >> [1]
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html
> >>
> >> On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas 
> wrote:
> >> >
> >> > Hello guys
> >> >
> >> > One question, I have a side input which fetches an endpoint each 30
> >> > min, I pretty much copied the example here:
> >> > https://beam.apache.org/documentation/patterns/side-inputs/ but added
> >> > some logic to fetch the endpoint and parse the payload.
> >> >
> >> > My question is: it is possible to control the parallelism of this
> >> > single ParDo that does the fetch/transform? I don't think I need a lot
> >> > of parallelism for that one. I'm currently using flink runner and I
> >> > see the parallelism is 8 (which is the general parallelism for my
> >> > flink cluster).
> >> >
> >> > Is it possible to set it to 1 for example?
> >> >
> >> >
> >> > Regards.
>


Re: Exactly once KafkaIO with flink runner

2024-06-19 Thread Ruben Vargas
Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
>
> Hi,
>
> regarding aligned vs unaligned checkpoints I recommend reading [1], it
> explains it quite well. Generally, I would prefer unaligned checkpoints
> in this case.
>
> Another thing to consider is the number of shards of the EOS sink.
> Because how the shards are distributed among workers, it might be good
> idea to actually increase that to some number higher than number of
> target partitions (e.g. targetPartitions * 10 or so). Additional thing
> to consider is increasing maxParallelism of the pipeline (e.g. max value
> is 32768), as it also affects how 'evenly' Flink assigns shards to
> workers. You can check if the assignment is even using counters in the
> sink operator(s).
>
>   Jan
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
>
> On 6/19/24 05:15, Ruben Vargas wrote:
> > Hello guys
> >
> > Now I was able to pass that error.
> >
> > I had to set the consumer factory function
> > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> >
> > This is because my cluster uses SASL authentication mechanism, and the
> > small consumer created to fetch the topics metadata was throwing that
> > error.
> >
> > There are other couple things I noticed:
> >
> >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > cluster and even with that the back pressure is high . Any advice on
> > this? I already increased the shards to equal the number of partitions
> > of the destination topic.
> >
> > - I have an error where
> > "State exists for shard mytopic-0, but there is no state stored with
> > Kafka topic mytopic' group id myconsumergroup'
> >
> > The only way I found to recover from this error is to change the group
> > name. Any other advice on how to recover from this error?
> >
> >
> > Thank you very much for following this up!
> >
> > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  
> > wrote:
> >> Hello Jan
> >>
> >> Thanks for the suggestions
> >>
> >> Any benefit of using aligned vs unaligned?
> >>
> >>
> >> At the end I found one problem that was preventing  flink from doing
> >> the checkpointing. It was a DoFn function that has some "non
> >> serializable" objects, so I made those transient and initialized those
> >> on the setup.
> >>
> >> Weird, because I usually was able to detect these kinds of errors just
> >> running in the direct runner, or even in flink before enabling EOS.
> >>
> >>
> >> Now I'm facing another weird issue
> >>
> >> org.apache.beam.sdk.util.UserCodeException:
> >> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> >> expired before the last committed offset for partitions
> >> [behavioral-signals-6] could be determined. Try tuning
> >> default.api.timeout.ms larger to relax the threshold.
> >> at 
> >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> >> at 
> >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> >> Source)
> >> at 
> >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> >>
> >> I tried to extend the timeout and it didn't work, my shards are equal
> >> to my number of partitions.
> >>
> >> I appreciate any kind of guidance
> >>
> >> Thanks.
> >>
> >> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:
> >>> I'd suggest:
> >>>   a) use unaligned checkpoints, if possible
> >>>
> >>>   b) verify the number of buckets you use for EOS sink, this limits 
> >>> parallelism [1].
> >>>
> >>> Best,
> >>>
> >>>   Jan
> >>>
> >>> [1] 
> >>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> >>>
> >>> On 6/18/24 09:32, Ruben Vargas wrote:
> >>>
> >>> Hello Lukavsky
> >>>
> >>> Thanks for your reply !
> >>>
> >>> I thought was due backpreassure but i increased the resources of the 
> >>> cluster and problem still presist. More that that, data stop flowing and 
> >>> 

Re: KafkaIO metric publishing

2024-06-19 Thread XQ Hu via user
Is your job a Dataflow Template job?

The error is caused by
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DataflowTemplateJob.java#L55
.

So basically DataflowTemplateJob does not support metrics.


On Wed, Jun 19, 2024 at 3:57 AM Lahiru Ginnaliya Gamathige <
glah...@gmail.com> wrote:

> Hi Users,
>
> In Google Cloud monitoring there is a limit of 100 metrics and when we are
> using KafkaIO, the library publishes a bunch of metrics per topic. With our
> use we will easily run out of 100 metric limit.
>
> We want to stop KafkaIO from publishing metrics and I do not see this is
> configurable. So I am trying to write a metric filtering logic (we are
> using beam version 2.55.1).
> I wrote a Sink but when I try to find a way to register the sink I cannot
> see a way to do the following in this beam version,
>
> *MetricsEnvironment.setMetricsSink(new
> CustomMetricsSink(options.getProject()));*
>
> Then I tried to register it like this,
>
> PipelineResult results = run(options);
> results.waitUntilFinish();
>
>
>
> *   MetricQueryResults metricQueryResults =
> results.metrics().queryMetrics(MetricsFilter.builder().build());
> CustomMetricSink reporter = new CustomMetricSink(options.getProject());
> reporter.writeMetrics(metricQueryResults);*
>
> With the above code pipeline is failing to start with the 
> error(java.lang.UnsupportedOperationException:
> The result of template creation should not be used.)
>
>
> Do you suggest another solution for this problem (it sounds like a quite
> common problem when using kafkaio). Or do you have any suggestion about my
> attempts ?
>
> Regards
> Lahiru
>
>


Re: Exactly once KafkaIO with flink runner

2024-06-19 Thread Jan Lukavský

Hi,

regarding aligned vs unaligned checkpoints I recommend reading [1], it 
explains it quite well. Generally, I would prefer unaligned checkpoints 
in this case.


Another thing to consider is the number of shards of the EOS sink. 
Because how the shards are distributed among workers, it might be good 
idea to actually increase that to some number higher than number of 
target partitions (e.g. targetPartitions * 10 or so). Additional thing 
to consider is increasing maxParallelism of the pipeline (e.g. max value 
is 32768), as it also affects how 'evenly' Flink assigns shards to 
workers. You can check if the assignment is even using counters in the 
sink operator(s).


 Jan

[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/


On 6/19/24 05:15, Ruben Vargas wrote:

Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

  - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  wrote:

Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:

I'd suggest:
  a) use unaligned checkpoints, if possible

  b) verify the number of buckets you use for EOS sink, this limits parallelism 
[1].

Best,

  Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-

On 6/18/24 09:32, Ruben Vargas wrote:

Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the cluster 
and problem still presist. More that that, data stop flowing and the checkpoint 
still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský  
escribió:

H Ruben,

from the provided screenshot it seems to me, that the pipeline in
backpressured by the sink. Can you please share your checkpoint
configuration? Are you using unaligned checkpoints? What is the
checkpointing interval and the volume of data coming in from the source?
With EOS data is committed after checkpoint, before that, the data is
buffered in state, which makes the sink more resource intensive.

   Jan

On 6/18/24 05:30, Ruben Vargas wrote:

Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  wrote:

Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application is not able to deliver messages to the output topic
due the checkpoint 

Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

 - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  wrote:
>
> Hello Jan
>
> Thanks for the suggestions
>
> Any benefit of using aligned vs unaligned?
>
>
> At the end I found one problem that was preventing  flink from doing
> the checkpointing. It was a DoFn function that has some "non
> serializable" objects, so I made those transient and initialized those
> on the setup.
>
> Weird, because I usually was able to detect these kinds of errors just
> running in the direct runner, or even in flink before enabling EOS.
>
>
> Now I'm facing another weird issue
>
> org.apache.beam.sdk.util.UserCodeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> expired before the last committed offset for partitions
> [behavioral-signals-6] could be determined. Try tuning
> default.api.timeout.ms larger to relax the threshold.
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at 
> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
>
> I tried to extend the timeout and it didn't work, my shards are equal
> to my number of partitions.
>
> I appreciate any kind of guidance
>
> Thanks.
>
> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:
> >
> > I'd suggest:
> >  a) use unaligned checkpoints, if possible
> >
> >  b) verify the number of buckets you use for EOS sink, this limits 
> > parallelism [1].
> >
> > Best,
> >
> >  Jan
> >
> > [1] 
> > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> >
> > On 6/18/24 09:32, Ruben Vargas wrote:
> >
> > Hello Lukavsky
> >
> > Thanks for your reply !
> >
> > I thought was due backpreassure but i increased the resources of the 
> > cluster and problem still presist. More that that, data stop flowing and 
> > the checkpoint still fail.
> >
> > I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
> > aligned checkpoint.
> >
> > El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> >  escribió:
> >>
> >> H Ruben,
> >>
> >> from the provided screenshot it seems to me, that the pipeline in
> >> backpressured by the sink. Can you please share your checkpoint
> >> configuration? Are you using unaligned checkpoints? What is the
> >> checkpointing interval and the volume of data coming in from the source?
> >> With EOS data is committed after checkpoint, before that, the data is
> >> buffered in state, which makes the sink more resource intensive.
> >>
> >>   Jan
> >>
> >> On 6/18/24 05:30, Ruben Vargas wrote:
> >> > Attached a better image of the console.
> >> >
> >> > Thanks!
> >> >
> >> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  
> >> > wrote:
> >> >> Hello guys
> >> >>
> >> >> Wondering if some of you have experiences enabling Exactly Once in
> >> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >> >> where all the checkpoints are failing. I cannot see any exception on
> >> >> the logs.
> >> >>
> >> >> Flink console only mentions this "Asynchronous task checkpoint
> >> >> failed." I also noticed that some operators don't acknowledge the
> >> >> checkpointing  (Attached a screenshot).
> >> >>
> >> >> I did this:
> >> >>
> >> >> 1) KafkaIO.Read:
> >> >>
> >> >> update consumer properties with enable.auto.commit = false
> >> >> .withReadCommitted()
> >> >> .commitOffsetsInFinalize()
> >> >>
> >> >> 2) KafkaIO#write:
> >> >>
> >> >> .withEOS(numShards, sinkGroupId)
> >> >>
> >> >> But my application is not able to deliver messages to the output topic
> >> >> due the checkpoint failing.
> >> >> I also reviewed the timeout and other time sensitive parameters, those
> >> >> are high right now.
> >> >>
> >> >> I really appreciate your guidance on this. Thank you


Re: (Python) How to test pipeline that raises an exception?

2024-06-18 Thread Valentyn Tymofieiev via user
Hi Jaehyeon,

for exceptions that happen at pipeline construction, it should be possible
to use unittest.assertRaises.

For errors at runtime, it should be possible to detect that the pipeline
has finished in a 'FAILED` state. You can capture the state by running the
pipeline without `with` decorator and capturing pipeline_result =
p.run().wait_until_finish(), and analyzing it, or by trying
out PipelineStateMatcher, from a quick look, we have some tests that use
it:
https://github.com/apache/beam/blob/3ed91c880f85d09a45039e70d5136f1c2324916d/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py#L68
,
https://github.com/apache/beam/blob/3ed91c880f85d09a45039e70d5136f1c2324916d/sdks/python/apache_beam/examples/complete/game/hourly_team_score_it_test.py#L80
, and it should be possible to create a matcher for a failed state.


On Tue, Jun 18, 2024 at 12:22 PM Jaehyeon Kim  wrote:

> Hello,
>
> I have a unit testing case and the pipeline raises TypeError. How is it
> possible to test? (I don't find unittest assertRaises method in the beam
> testing util)
>
> Cheers,
> Jaehyeon
>
> def test_write_to_firehose_with_unsupported_types(self):
> with TestPipeline(options=self.pipeline_opts) as p:
> output = (
> p
> | beam.Create(["one", "two", "three", "four"])
> | "WriteToFirehose" >> WriteToFirehose(self.
> delivery_stream_name, True)
> | "CollectResponse" >> beam.Map(lambda e: e[
> "FailedPutCount"])
> )
>


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:
>
> I'd suggest:
>  a) use unaligned checkpoints, if possible
>
>  b) verify the number of buckets you use for EOS sink, this limits 
> parallelism [1].
>
> Best,
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
>
> On 6/18/24 09:32, Ruben Vargas wrote:
>
> Hello Lukavsky
>
> Thanks for your reply !
>
> I thought was due backpreassure but i increased the resources of the cluster 
> and problem still presist. More that that, data stop flowing and the 
> checkpoint still fail.
>
> I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
> aligned checkpoint.
>
> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
>  escribió:
>>
>> H Ruben,
>>
>> from the provided screenshot it seems to me, that the pipeline in
>> backpressured by the sink. Can you please share your checkpoint
>> configuration? Are you using unaligned checkpoints? What is the
>> checkpointing interval and the volume of data coming in from the source?
>> With EOS data is committed after checkpoint, before that, the data is
>> buffered in state, which makes the sink more resource intensive.
>>
>>   Jan
>>
>> On 6/18/24 05:30, Ruben Vargas wrote:
>> > Attached a better image of the console.
>> >
>> > Thanks!
>> >
>> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  
>> > wrote:
>> >> Hello guys
>> >>
>> >> Wondering if some of you have experiences enabling Exactly Once in
>> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
>> >> where all the checkpoints are failing. I cannot see any exception on
>> >> the logs.
>> >>
>> >> Flink console only mentions this "Asynchronous task checkpoint
>> >> failed." I also noticed that some operators don't acknowledge the
>> >> checkpointing  (Attached a screenshot).
>> >>
>> >> I did this:
>> >>
>> >> 1) KafkaIO.Read:
>> >>
>> >> update consumer properties with enable.auto.commit = false
>> >> .withReadCommitted()
>> >> .commitOffsetsInFinalize()
>> >>
>> >> 2) KafkaIO#write:
>> >>
>> >> .withEOS(numShards, sinkGroupId)
>> >>
>> >> But my application is not able to deliver messages to the output topic
>> >> due the checkpoint failing.
>> >> I also reviewed the timeout and other time sensitive parameters, those
>> >> are high right now.
>> >>
>> >> I really appreciate your guidance on this. Thank you


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Jan Lukavský

I'd suggest:
 a) use unaligned checkpoints, if possible

 b) verify the number of buckets you use for EOS sink, this limits 
parallelism [1].


Best,

 Jan

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-


On 6/18/24 09:32, Ruben Vargas wrote:

Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the 
cluster and problem still presist. More that that, data stop flowing 
and the checkpoint still fail.


I have configured the checkpoint to do it per minute. The timeout is 
1h. Is aligned checkpoint.


El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
 escribió:


H Ruben,

from the provided screenshot it seems to me, that the pipeline in
backpressured by the sink. Can you please share your checkpoint
configuration? Are you using unaligned checkpoints? What is the
checkpointing interval and the volume of data coming in from the
source?
With EOS data is committed after checkpoint, before that, the data is
buffered in state, which makes the sink more resource intensive.

  Jan

On 6/18/24 05:30, Ruben Vargas wrote:
> Attached a better image of the console.
>
> Thanks!
>
> On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas
 wrote:
>> Hello guys
>>
>> Wondering if some of you have experiences enabling Exactly Once in
>> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
>> where all the checkpoints are failing. I cannot see any
exception on
>> the logs.
>>
>> Flink console only mentions this "Asynchronous task checkpoint
>> failed." I also noticed that some operators don't acknowledge the
>> checkpointing  (Attached a screenshot).
>>
>> I did this:
>>
>> 1) KafkaIO.Read:
>>
>> update consumer properties with enable.auto.commit = false
>> .withReadCommitted()
>> .commitOffsetsInFinalize()
>>
>> 2) KafkaIO#write:
>>
>> .withEOS(numShards, sinkGroupId)
>>
>> But my application is not able to deliver messages to the
output topic
>> due the checkpoint failing.
>> I also reviewed the timeout and other time sensitive
parameters, those
>> are high right now.
>>
>> I really appreciate your guidance on this. Thank you


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the
cluster and problem still presist. More that that, data stop flowing and
the checkpoint still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is
aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský <
je...@seznam.cz> escribió:

> H Ruben,
>
> from the provided screenshot it seems to me, that the pipeline in
> backpressured by the sink. Can you please share your checkpoint
> configuration? Are you using unaligned checkpoints? What is the
> checkpointing interval and the volume of data coming in from the source?
> With EOS data is committed after checkpoint, before that, the data is
> buffered in state, which makes the sink more resource intensive.
>
>   Jan
>
> On 6/18/24 05:30, Ruben Vargas wrote:
> > Attached a better image of the console.
> >
> > Thanks!
> >
> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas 
> wrote:
> >> Hello guys
> >>
> >> Wondering if some of you have experiences enabling Exactly Once in
> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >> where all the checkpoints are failing. I cannot see any exception on
> >> the logs.
> >>
> >> Flink console only mentions this "Asynchronous task checkpoint
> >> failed." I also noticed that some operators don't acknowledge the
> >> checkpointing  (Attached a screenshot).
> >>
> >> I did this:
> >>
> >> 1) KafkaIO.Read:
> >>
> >> update consumer properties with enable.auto.commit = false
> >> .withReadCommitted()
> >> .commitOffsetsInFinalize()
> >>
> >> 2) KafkaIO#write:
> >>
> >> .withEOS(numShards, sinkGroupId)
> >>
> >> But my application is not able to deliver messages to the output topic
> >> due the checkpoint failing.
> >> I also reviewed the timeout and other time sensitive parameters, those
> >> are high right now.
> >>
> >> I really appreciate your guidance on this. Thank you
>


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Jan Lukavský

H Ruben,

from the provided screenshot it seems to me, that the pipeline in 
backpressured by the sink. Can you please share your checkpoint 
configuration? Are you using unaligned checkpoints? What is the 
checkpointing interval and the volume of data coming in from the source? 
With EOS data is committed after checkpoint, before that, the data is 
buffered in state, which makes the sink more resource intensive.


 Jan

On 6/18/24 05:30, Ruben Vargas wrote:

Attached a better image of the console.

Thanks!

On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  wrote:

Hello guys

Wondering if some of you have experiences enabling Exactly Once in
KafkaIO with Flink runner? I enabled it and now I'm facing an issue
where all the checkpoints are failing. I cannot see any exception on
the logs.

Flink console only mentions this "Asynchronous task checkpoint
failed." I also noticed that some operators don't acknowledge the
checkpointing  (Attached a screenshot).

I did this:

1) KafkaIO.Read:

update consumer properties with enable.auto.commit = false
.withReadCommitted()
.commitOffsetsInFinalize()

2) KafkaIO#write:

.withEOS(numShards, sinkGroupId)

But my application is not able to deliver messages to the output topic
due the checkpoint failing.
I also reviewed the timeout and other time sensitive parameters, those
are high right now.

I really appreciate your guidance on this. Thank you


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread Sofia’s World
Thanks. It appears that i did not read fully the documentation and i missed
this in my dataflow flex-template run

, '--parameters'
  , 'sdk_container_image=$_SDK_CONTAINER_IMAGE'

All my other jobs use a dodgy docker file which does not require  the
parameter above...
I should be fine for the time being,  at least my pipeline is not plagued
anymore by import errors
 thanks all for help ing out

kind regards
Marco




On Sun, Jun 16, 2024 at 6:27 PM Utkarsh Parekh 
wrote:

> You have “mypackage” incorrectly built. Please check and confirm that.
>
> Utkarsh
>
> On Sun, Jun 16, 2024 at 12:48 PM Sofia’s World 
> wrote:
>
>> Error is same...- see bottom -
>> i have tried to ssh in the container and the directory is setup as
>> expected.. so not quite sure where the issue is
>> i will try to start from the pipeline with dependencies sample and work
>> out from there  w.o bothering the list
>>
>> thanks again for following up
>>  Marco
>>
>> Could not load main session. Inspect which external dependencies are used
>> in the main module of your pipeline. Verify that corresponding packages are
>> installed in the pipeline runtime environment and their installed versions
>> match the versions used in pipeline submission environment. For more
>> information, see:
>> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
>> Traceback (most recent call last): File
>> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
>> line 115, in create_harness _load_main_session(semi_persistent_directory)
>> File
>> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
>> line 354, in _load_main_session pickler.load_session(session_file) File
>> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py",
>> line 65, in load_session return desired_pickle_lib.load_session(file_path)
>> ^^ File
>> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/dill_pickler.py",
>> line 446, in load_session return dill.load_session(file_path)
>>  File
>> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 368, in
>> load_session module = unpickler.load()  File
>> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 472, in load
>> obj = StockUnpickler.load(self) ^ File
>> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 827, in
>> _import_module return getattr(__import__(module, None, None, [obj]), obj)
>> ^ ModuleNotFoundError: No module named
>> 'mypackage'
>>
>>
>>
>> On Sun, 16 Jun 2024, 14:50 XQ Hu via user,  wrote:
>>
>>> What is the error message now?
>>> You can easily ssh to your docker container and check everything is
>>> installed correctly by:
>>> docker run --rm -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE
>>>
>>>
>>> On Sun, Jun 16, 2024 at 5:18 AM Sofia’s World 
>>> wrote:
>>>
 Valentin, many thanks... i actually spotted the reference in teh setup
 file
 However , after correcting it, i am still at square 1 where somehow my
 runtime environment does not see it.. so i added some debugging to my
 Dockerfile to check if i forgot to copy something,
 and here's the output, where i can see the mypackage has been copied

 here's my directory structure

  mypackage
 __init__.py
 obbutils.py
 launcher.py
 __init__.py
 dataflow_tester.py
 setup_dftester.py (copied to setup.py)

 i can see the directory structure has been maintained when i copy my
 files to docker as i added some debug to my dockerfile

 Step #0 - "dftester-image": Removing intermediate container 4c4e763289d2
 Step #0 - "dftester-image":  ---> cda378f70a9e
 Step #0 - "dftester-image": Step 6/23 : COPY requirements.txt .
 Step #0 - "dftester-image":  ---> 9a43da08b013
 Step #0 - "dftester-image": Step 7/23 : COPY setup_dftester.py setup.py
 Step #0 - "dftester-image":  ---> 5a6bf71df052
 Step #0 - "dftester-image": Step 8/23 : COPY dataflow_tester.py .
 Step #0 - "dftester-image":  ---> 82cfe1f1f9ed
 Step #0 - "dftester-image": Step 9/23 : COPY mypackage mypackage
 Step #0 - "dftester-image":  ---> d86497b791d0
 Step #0 - "dftester-image": Step 10/23 : COPY __init__.py
 ${WORKDIR}/__init__.py
 Step #0 - "dftester-image":  ---> 337d149d64c7
 Step #0 - "dftester-image": Step 11/23 : RUN echo '- listing
 workdir'
 Step #0 - "dftester-image":  ---> Running in 9d97d8a64319
 Step #0 - "dftester-image": - listing workdir
 Step #0 - "dftester-image": Removing intermediate container 9d97d8a64319
 Step #0 - "dftester-image":  ---> bc9a6a2aa462
 Step #0 - "dftester-image": Step 12/23 : RUN ls -la ${WORKDIR}
 Step #0 - "dftester-image":  ---> Running in cf164108f9d6
 Step #0 - "dftester-image": total 24

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread Utkarsh Parekh
You have “mypackage” incorrectly built. Please check and confirm that.

Utkarsh

On Sun, Jun 16, 2024 at 12:48 PM Sofia’s World  wrote:

> Error is same...- see bottom -
> i have tried to ssh in the container and the directory is setup as
> expected.. so not quite sure where the issue is
> i will try to start from the pipeline with dependencies sample and work
> out from there  w.o bothering the list
>
> thanks again for following up
>  Marco
>
> Could not load main session. Inspect which external dependencies are used
> in the main module of your pipeline. Verify that corresponding packages are
> installed in the pipeline runtime environment and their installed versions
> match the versions used in pipeline submission environment. For more
> information, see:
> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
> Traceback (most recent call last): File
> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 115, in create_harness _load_main_session(semi_persistent_directory)
> File
> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 354, in _load_main_session pickler.load_session(session_file) File
> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py",
> line 65, in load_session return desired_pickle_lib.load_session(file_path)
> ^^ File
> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/dill_pickler.py",
> line 446, in load_session return dill.load_session(file_path)
>  File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 368, in
> load_session module = unpickler.load()  File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 472, in load
> obj = StockUnpickler.load(self) ^ File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 827, in
> _import_module return getattr(__import__(module, None, None, [obj]), obj)
> ^ ModuleNotFoundError: No module named
> 'mypackage'
>
>
>
> On Sun, 16 Jun 2024, 14:50 XQ Hu via user,  wrote:
>
>> What is the error message now?
>> You can easily ssh to your docker container and check everything is
>> installed correctly by:
>> docker run --rm -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE
>>
>>
>> On Sun, Jun 16, 2024 at 5:18 AM Sofia’s World 
>> wrote:
>>
>>> Valentin, many thanks... i actually spotted the reference in teh setup
>>> file
>>> However , after correcting it, i am still at square 1 where somehow my
>>> runtime environment does not see it.. so i added some debugging to my
>>> Dockerfile to check if i forgot to copy something,
>>> and here's the output, where i can see the mypackage has been copied
>>>
>>> here's my directory structure
>>>
>>>  mypackage
>>> __init__.py
>>> obbutils.py
>>> launcher.py
>>> __init__.py
>>> dataflow_tester.py
>>> setup_dftester.py (copied to setup.py)
>>>
>>> i can see the directory structure has been maintained when i copy my
>>> files to docker as i added some debug to my dockerfile
>>>
>>> Step #0 - "dftester-image": Removing intermediate container 4c4e763289d2
>>> Step #0 - "dftester-image":  ---> cda378f70a9e
>>> Step #0 - "dftester-image": Step 6/23 : COPY requirements.txt .
>>> Step #0 - "dftester-image":  ---> 9a43da08b013
>>> Step #0 - "dftester-image": Step 7/23 : COPY setup_dftester.py setup.py
>>> Step #0 - "dftester-image":  ---> 5a6bf71df052
>>> Step #0 - "dftester-image": Step 8/23 : COPY dataflow_tester.py .
>>> Step #0 - "dftester-image":  ---> 82cfe1f1f9ed
>>> Step #0 - "dftester-image": Step 9/23 : COPY mypackage mypackage
>>> Step #0 - "dftester-image":  ---> d86497b791d0
>>> Step #0 - "dftester-image": Step 10/23 : COPY __init__.py
>>> ${WORKDIR}/__init__.py
>>> Step #0 - "dftester-image":  ---> 337d149d64c7
>>> Step #0 - "dftester-image": Step 11/23 : RUN echo '- listing workdir'
>>> Step #0 - "dftester-image":  ---> Running in 9d97d8a64319
>>> Step #0 - "dftester-image": - listing workdir
>>> Step #0 - "dftester-image": Removing intermediate container 9d97d8a64319
>>> Step #0 - "dftester-image":  ---> bc9a6a2aa462
>>> Step #0 - "dftester-image": Step 12/23 : RUN ls -la ${WORKDIR}
>>> Step #0 - "dftester-image":  ---> Running in cf164108f9d6
>>> Step #0 - "dftester-image": total 24
>>> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 .
>>> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
>>> __init__.py
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root  135 Jun 16 08:57
>>> dataflow_tester.py
>>> Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59
>>> mypackage
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root   64 Jun 16 08:57
>>> requirements.txt
>>> Step #0 - "dftester-image": -rw-r--r-- 1 root root  736 Jun 16 08:57
>>> 

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread Sofia’s World
Error is same...- see bottom -
i have tried to ssh in the container and the directory is setup as
expected.. so not quite sure where the issue is
i will try to start from the pipeline with dependencies sample and work out
from there  w.o bothering the list

thanks again for following up
 Marco

Could not load main session. Inspect which external dependencies are used
in the main module of your pipeline. Verify that corresponding packages are
installed in the pipeline runtime environment and their installed versions
match the versions used in pipeline submission environment. For more
information, see:
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
Traceback (most recent call last): File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
line 115, in create_harness _load_main_session(semi_persistent_directory)
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
line 354, in _load_main_session pickler.load_session(session_file) File
"/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py",
line 65, in load_session return desired_pickle_lib.load_session(file_path)
^^ File
"/usr/local/lib/python3.11/site-packages/apache_beam/internal/dill_pickler.py",
line 446, in load_session return dill.load_session(file_path)
 File
"/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 368, in
load_session module = unpickler.load()  File
"/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self) ^ File
"/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 827, in
_import_module return getattr(__import__(module, None, None, [obj]), obj)
^ ModuleNotFoundError: No module named
'mypackage'



On Sun, 16 Jun 2024, 14:50 XQ Hu via user,  wrote:

> What is the error message now?
> You can easily ssh to your docker container and check everything is
> installed correctly by:
> docker run --rm -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE
>
>
> On Sun, Jun 16, 2024 at 5:18 AM Sofia’s World  wrote:
>
>> Valentin, many thanks... i actually spotted the reference in teh setup
>> file
>> However , after correcting it, i am still at square 1 where somehow my
>> runtime environment does not see it.. so i added some debugging to my
>> Dockerfile to check if i forgot to copy something,
>> and here's the output, where i can see the mypackage has been copied
>>
>> here's my directory structure
>>
>>  mypackage
>> __init__.py
>> obbutils.py
>> launcher.py
>> __init__.py
>> dataflow_tester.py
>> setup_dftester.py (copied to setup.py)
>>
>> i can see the directory structure has been maintained when i copy my
>> files to docker as i added some debug to my dockerfile
>>
>> Step #0 - "dftester-image": Removing intermediate container 4c4e763289d2
>> Step #0 - "dftester-image":  ---> cda378f70a9e
>> Step #0 - "dftester-image": Step 6/23 : COPY requirements.txt .
>> Step #0 - "dftester-image":  ---> 9a43da08b013
>> Step #0 - "dftester-image": Step 7/23 : COPY setup_dftester.py setup.py
>> Step #0 - "dftester-image":  ---> 5a6bf71df052
>> Step #0 - "dftester-image": Step 8/23 : COPY dataflow_tester.py .
>> Step #0 - "dftester-image":  ---> 82cfe1f1f9ed
>> Step #0 - "dftester-image": Step 9/23 : COPY mypackage mypackage
>> Step #0 - "dftester-image":  ---> d86497b791d0
>> Step #0 - "dftester-image": Step 10/23 : COPY __init__.py
>> ${WORKDIR}/__init__.py
>> Step #0 - "dftester-image":  ---> 337d149d64c7
>> Step #0 - "dftester-image": Step 11/23 : RUN echo '- listing workdir'
>> Step #0 - "dftester-image":  ---> Running in 9d97d8a64319
>> Step #0 - "dftester-image": - listing workdir
>> Step #0 - "dftester-image": Removing intermediate container 9d97d8a64319
>> Step #0 - "dftester-image":  ---> bc9a6a2aa462
>> Step #0 - "dftester-image": Step 12/23 : RUN ls -la ${WORKDIR}
>> Step #0 - "dftester-image":  ---> Running in cf164108f9d6
>> Step #0 - "dftester-image": total 24
>> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 .
>> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
>> Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
>> __init__.py
>> Step #0 - "dftester-image": -rw-r--r-- 1 root root  135 Jun 16 08:57
>> dataflow_tester.py
>> Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59
>> mypackage
>> Step #0 - "dftester-image": -rw-r--r-- 1 root root   64 Jun 16 08:57
>> requirements.txt
>> Step #0 - "dftester-image": -rw-r--r-- 1 root root  736 Jun 16 08:57
>> setup.py
>> Step #0 - "dftester-image": Removing intermediate container cf164108f9d6
>> Step #0 - "dftester-image":  ---> eb1a080b7948
>> Step #0 - "dftester-image": Step 13/23 : RUN echo '--- listing modules
>> -'
>> Step #0 - "dftester-image":  ---> Running in 

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread XQ Hu via user
What is the error message now?
You can easily ssh to your docker container and check everything is
installed correctly by:
docker run --rm -it --entrypoint=/bin/bash $CUSTOM_CONTAINER_IMAGE


On Sun, Jun 16, 2024 at 5:18 AM Sofia’s World  wrote:

> Valentin, many thanks... i actually spotted the reference in teh setup file
> However , after correcting it, i am still at square 1 where somehow my
> runtime environment does not see it.. so i added some debugging to my
> Dockerfile to check if i forgot to copy something,
> and here's the output, where i can see the mypackage has been copied
>
> here's my directory structure
>
>  mypackage
> __init__.py
> obbutils.py
> launcher.py
> __init__.py
> dataflow_tester.py
> setup_dftester.py (copied to setup.py)
>
> i can see the directory structure has been maintained when i copy my files
> to docker as i added some debug to my dockerfile
>
> Step #0 - "dftester-image": Removing intermediate container 4c4e763289d2
> Step #0 - "dftester-image":  ---> cda378f70a9e
> Step #0 - "dftester-image": Step 6/23 : COPY requirements.txt .
> Step #0 - "dftester-image":  ---> 9a43da08b013
> Step #0 - "dftester-image": Step 7/23 : COPY setup_dftester.py setup.py
> Step #0 - "dftester-image":  ---> 5a6bf71df052
> Step #0 - "dftester-image": Step 8/23 : COPY dataflow_tester.py .
> Step #0 - "dftester-image":  ---> 82cfe1f1f9ed
> Step #0 - "dftester-image": Step 9/23 : COPY mypackage mypackage
> Step #0 - "dftester-image":  ---> d86497b791d0
> Step #0 - "dftester-image": Step 10/23 : COPY __init__.py
> ${WORKDIR}/__init__.py
> Step #0 - "dftester-image":  ---> 337d149d64c7
> Step #0 - "dftester-image": Step 11/23 : RUN echo '- listing workdir'
> Step #0 - "dftester-image":  ---> Running in 9d97d8a64319
> Step #0 - "dftester-image": - listing workdir
> Step #0 - "dftester-image": Removing intermediate container 9d97d8a64319
> Step #0 - "dftester-image":  ---> bc9a6a2aa462
> Step #0 - "dftester-image": Step 12/23 : RUN ls -la ${WORKDIR}
> Step #0 - "dftester-image":  ---> Running in cf164108f9d6
> Step #0 - "dftester-image": total 24
> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 .
> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
> Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
> __init__.py
> Step #0 - "dftester-image": -rw-r--r-- 1 root root  135 Jun 16 08:57
> dataflow_tester.py
> Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59
> mypackage
> Step #0 - "dftester-image": -rw-r--r-- 1 root root   64 Jun 16 08:57
> requirements.txt
> Step #0 - "dftester-image": -rw-r--r-- 1 root root  736 Jun 16 08:57
> setup.py
> Step #0 - "dftester-image": Removing intermediate container cf164108f9d6
> Step #0 - "dftester-image":  ---> eb1a080b7948
> Step #0 - "dftester-image": Step 13/23 : RUN echo '--- listing modules
> -'
> Step #0 - "dftester-image":  ---> Running in 884f03dd81d6
> Step #0 - "dftester-image": --- listing modules -
> Step #0 - "dftester-image": Removing intermediate container 884f03dd81d6
> Step #0 - "dftester-image":  ---> 9f6f7e27bd2f
> Step #0 - "dftester-image": Step 14/23 : RUN ls -la  ${WORKDIR}/mypackage
> Step #0 - "dftester-image":  ---> Running in bd74ade37010
> Step #0 - "dftester-image": total 16
> Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59 .
> Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
> Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
> __init__.py
> Step #0 - "dftester-image": -rw-r--r-- 1 root root 1442 Jun 16 08:57
> launcher.py
> Step #0 - "dftester-image": -rw-r--r-- 1 root root  607 Jun 16 08:57
> obb_utils.py
> Step #0 - "dftester-image": Removing intermediate container bd74ade37010
>
>
> i have this in my setup.py
>
> REQUIRED_PACKAGES = [
> 'openbb',
> "apache-beam[gcp]",  # Must match the version in `Dockerfile``.
> 'sendgrid',
> 'pandas_datareader',
> 'vaderSentiment',
> 'numpy',
> 'bs4',
> 'lxml',
> 'pandas_datareader',
> 'beautifulsoup4',
> 'xlrd',
> 'openpyxl'
> ]
>
>
> setuptools.setup(
> name='mypackage',
> version='0.0.1',
> description='Shres Runner Package.',
> install_requires=REQUIRED_PACKAGES,
> packages=setuptools.find_packages()
> )
>
>
> and this is my dataflow_tester.py
>
> from mypackage import launcher
> import logging
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   launcher.run()
>
>
>
> have compared my setup vs
> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
> and all looks the same (apart from my copying the __init__.,py fromo the
> directory where the main file(dataflow_tester.py) resides
>
> Would you know how else can i debug what is going on and why my
> mypackages subdirectory is not being seen?
>
> Kind regars
>  Marco
>
>
>
>
> On Sat, Jun 15, 2024 at 7:27 PM Valentyn 

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-16 Thread Sofia’s World
Valentin, many thanks... i actually spotted the reference in teh setup file
However , after correcting it, i am still at square 1 where somehow my
runtime environment does not see it.. so i added some debugging to my
Dockerfile to check if i forgot to copy something,
and here's the output, where i can see the mypackage has been copied

here's my directory structure

 mypackage
__init__.py
obbutils.py
launcher.py
__init__.py
dataflow_tester.py
setup_dftester.py (copied to setup.py)

i can see the directory structure has been maintained when i copy my files
to docker as i added some debug to my dockerfile

Step #0 - "dftester-image": Removing intermediate container 4c4e763289d2
Step #0 - "dftester-image":  ---> cda378f70a9e
Step #0 - "dftester-image": Step 6/23 : COPY requirements.txt .
Step #0 - "dftester-image":  ---> 9a43da08b013
Step #0 - "dftester-image": Step 7/23 : COPY setup_dftester.py setup.py
Step #0 - "dftester-image":  ---> 5a6bf71df052
Step #0 - "dftester-image": Step 8/23 : COPY dataflow_tester.py .
Step #0 - "dftester-image":  ---> 82cfe1f1f9ed
Step #0 - "dftester-image": Step 9/23 : COPY mypackage mypackage
Step #0 - "dftester-image":  ---> d86497b791d0
Step #0 - "dftester-image": Step 10/23 : COPY __init__.py
${WORKDIR}/__init__.py
Step #0 - "dftester-image":  ---> 337d149d64c7
Step #0 - "dftester-image": Step 11/23 : RUN echo '- listing workdir'
Step #0 - "dftester-image":  ---> Running in 9d97d8a64319
Step #0 - "dftester-image": - listing workdir
Step #0 - "dftester-image": Removing intermediate container 9d97d8a64319
Step #0 - "dftester-image":  ---> bc9a6a2aa462
Step #0 - "dftester-image": Step 12/23 : RUN ls -la ${WORKDIR}
Step #0 - "dftester-image":  ---> Running in cf164108f9d6
Step #0 - "dftester-image": total 24
Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 .
Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
__init__.py
Step #0 - "dftester-image": -rw-r--r-- 1 root root  135 Jun 16 08:57
dataflow_tester.py
Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59
mypackage
Step #0 - "dftester-image": -rw-r--r-- 1 root root   64 Jun 16 08:57
requirements.txt
Step #0 - "dftester-image": -rw-r--r-- 1 root root  736 Jun 16 08:57
setup.py
Step #0 - "dftester-image": Removing intermediate container cf164108f9d6
Step #0 - "dftester-image":  ---> eb1a080b7948
Step #0 - "dftester-image": Step 13/23 : RUN echo '--- listing modules
-'
Step #0 - "dftester-image":  ---> Running in 884f03dd81d6
Step #0 - "dftester-image": --- listing modules -
Step #0 - "dftester-image": Removing intermediate container 884f03dd81d6
Step #0 - "dftester-image":  ---> 9f6f7e27bd2f
Step #0 - "dftester-image": Step 14/23 : RUN ls -la  ${WORKDIR}/mypackage
Step #0 - "dftester-image":  ---> Running in bd74ade37010
Step #0 - "dftester-image": total 16
Step #0 - "dftester-image": drwxr-xr-x 2 root root 4096 Jun 16 08:59 .
Step #0 - "dftester-image": drwxr-xr-x 1 root root 4096 Jun 16 08:59 ..
Step #0 - "dftester-image": -rw-r--r-- 1 root root0 Jun 16 08:57
__init__.py
Step #0 - "dftester-image": -rw-r--r-- 1 root root 1442 Jun 16 08:57
launcher.py
Step #0 - "dftester-image": -rw-r--r-- 1 root root  607 Jun 16 08:57
obb_utils.py
Step #0 - "dftester-image": Removing intermediate container bd74ade37010


i have this in my setup.py

REQUIRED_PACKAGES = [
'openbb',
"apache-beam[gcp]",  # Must match the version in `Dockerfile``.
'sendgrid',
'pandas_datareader',
'vaderSentiment',
'numpy',
'bs4',
'lxml',
'pandas_datareader',
'beautifulsoup4',
'xlrd',
'openpyxl'
]


setuptools.setup(
name='mypackage',
version='0.0.1',
description='Shres Runner Package.',
install_requires=REQUIRED_PACKAGES,
packages=setuptools.find_packages()
)


and this is my dataflow_tester.py

from mypackage import launcher
import logging
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  launcher.run()



have compared my setup vs
https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
and all looks the same (apart from my copying the __init__.,py fromo the
directory where the main file(dataflow_tester.py) resides

Would you know how else can i debug what is going on and why my  mypackages
subdirectory is not being seen?

Kind regars
 Marco




On Sat, Jun 15, 2024 at 7:27 PM Valentyn Tymofieiev via user <
user@beam.apache.org> wrote:

> Your pipeline launcher refers to a package named 'modules', but this
> package is not available in the runtime environment.
>
> On Sat, Jun 15, 2024 at 11:17 AM Sofia’s World 
> wrote:
>
>> Sorry, i cheered up too early
>> i can successfully build the image however, at runtime the code fails
>> always with this exception and i cannot figure out why
>>
>> i mimicked the sample directory structure
>>
>>
>>  mypackage

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-15 Thread Valentyn Tymofieiev via user
Your pipeline launcher refers to a package named 'modules', but this
package is not available in the runtime environment.

On Sat, Jun 15, 2024 at 11:17 AM Sofia’s World  wrote:

> Sorry, i cheered up too early
> i can successfully build the image however, at runtime the code fails
> always with this exception and i cannot figure out why
>
> i mimicked the sample directory structure
>
>
>  mypackage
>--- __init__,py
>dftester.py
>obb_utils.py
>
> dataflow_tester_main.py
>
> this is the content of my dataflow_tester_main.py
>
> from mypackage import dftester
> import logging
> if __name__ == '__main__':
>   logging.getLogger().setLevel(logging.INFO)
>   dftester.run()
>
>
> and this is my dockerfile
>
>
> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/Dockerfile_tester
>
> and at the bottom if this email my exception
> I am puzzled on where the error is coming from as i have almost copied
> this sample
> https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/flex-templates/pipeline_with_dependencies/main.py
>
> thanks and regards
>  Marco
>
>
>
>
>
>
>
>
>
>
>
> Traceback (most recent call last): File
> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 115, in create_harness _load_main_session(semi_persistent_directory)
> File
> "/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
> line 354, in _load_main_session pickler.load_session(session_file) File
> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py",
> line 65, in load_session return desired_pickle_lib.load_session(file_path)
> ^^ File
> "/usr/local/lib/python3.11/site-packages/apache_beam/internal/dill_pickler.py",
> line 446, in load_session return dill.load_session(file_path)
>  File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 368, in
> load_session module = unpickler.load()  File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 472, in load
> obj = StockUnpickler.load(self) ^ File
> "/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 462, in
> find_class return StockUnpickler.find_class(self, module, name)
> ^ ModuleNotFoundError: No
> module named 'modules'
>
>
>
>
>
>
>
> On Fri, Jun 14, 2024 at 5:52 AM Sofia’s World  wrote:
>
>> Many thanks Hu, worked like a charm
>>
>> few qq
>> so in my reqs.txt i should put all beam requirements PLUS my own?
>>
>> and in the setup.py, shall i just declare
>>
>> "apache-beam[gcp]==2.54.0",  # Must match the version in `Dockerfile``.
>>
>> thanks and kind regards
>> Marco
>>
>>
>>
>>
>>
>>
>> On Wed, Jun 12, 2024 at 1:48 PM XQ Hu  wrote:
>>
>>> Any reason to use this?
>>>
>>> RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
>>>  pandas-datareader==0.9.0
>>>
>>> It is typically recommended to use the latest Beam and build the docker
>>> image using the requirements released for each Beam, for example,
>>> https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt
>>>
>>> On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World 
>>> wrote:
>>>
 Sure, apologies, it crossed my mind it would have been useful to refert
 to it

 so this is the docker file


 https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester

 I was using a setup.py as well, but then i commented out the usage in
 the dockerfile after checking some flex templates which said it is not
 needed


 https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py

 thanks in advance
  Marco







 On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:

> Can you share your Dockerfile?
>
> On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
> wrote:
>
>> thanks all,  it seemed to work but now i am getting a different
>> problem, having issues in building pyarrow...
>>
>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":  
>>  :36: DeprecationWarning: pkg_resources is deprecated as an API. 
>> See https://setuptools.pypa.io/en/latest/pkg_resources.html
>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":  
>>  WARNING setuptools_scm.pyproject_reading toml section missing 
>> 'pyproject.toml does not contain a tool.setuptools_scm section'
>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":  
>>  Traceback (most recent call last):
>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":  
>>File 
>> 

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-15 Thread Sofia’s World
Sorry, i cheered up too early
i can successfully build the image however, at runtime the code fails
always with this exception and i cannot figure out why

i mimicked the sample directory structure


 mypackage
   --- __init__,py
   dftester.py
   obb_utils.py

dataflow_tester_main.py

this is the content of my dataflow_tester_main.py

from mypackage import dftester
import logging
if __name__ == '__main__':
  logging.getLogger().setLevel(logging.INFO)
  dftester.run()


and this is my dockerfile

https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/Dockerfile_tester

and at the bottom if this email my exception
I am puzzled on where the error is coming from as i have almost copied this
sample
https://github.com/GoogleCloudPlatform/python-docs-samples/blob/main/dataflow/flex-templates/pipeline_with_dependencies/main.py

thanks and regards
 Marco











Traceback (most recent call last): File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
line 115, in create_harness _load_main_session(semi_persistent_directory)
File
"/usr/local/lib/python3.11/site-packages/apache_beam/runners/worker/sdk_worker_main.py",
line 354, in _load_main_session pickler.load_session(session_file) File
"/usr/local/lib/python3.11/site-packages/apache_beam/internal/pickler.py",
line 65, in load_session return desired_pickle_lib.load_session(file_path)
^^ File
"/usr/local/lib/python3.11/site-packages/apache_beam/internal/dill_pickler.py",
line 446, in load_session return dill.load_session(file_path)
 File
"/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 368, in
load_session module = unpickler.load()  File
"/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 472, in load
obj = StockUnpickler.load(self) ^ File
"/usr/local/lib/python3.11/site-packages/dill/_dill.py", line 462, in
find_class return StockUnpickler.find_class(self, module, name)
^ ModuleNotFoundError: No
module named 'modules'







On Fri, Jun 14, 2024 at 5:52 AM Sofia’s World  wrote:

> Many thanks Hu, worked like a charm
>
> few qq
> so in my reqs.txt i should put all beam requirements PLUS my own?
>
> and in the setup.py, shall i just declare
>
> "apache-beam[gcp]==2.54.0",  # Must match the version in `Dockerfile``.
>
> thanks and kind regards
> Marco
>
>
>
>
>
>
> On Wed, Jun 12, 2024 at 1:48 PM XQ Hu  wrote:
>
>> Any reason to use this?
>>
>> RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
>>  pandas-datareader==0.9.0
>>
>> It is typically recommended to use the latest Beam and build the docker
>> image using the requirements released for each Beam, for example,
>> https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt
>>
>> On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World 
>> wrote:
>>
>>> Sure, apologies, it crossed my mind it would have been useful to refert
>>> to it
>>>
>>> so this is the docker file
>>>
>>>
>>> https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester
>>>
>>> I was using a setup.py as well, but then i commented out the usage in
>>> the dockerfile after checking some flex templates which said it is not
>>> needed
>>>
>>>
>>> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py
>>>
>>> thanks in advance
>>>  Marco
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:
>>>
 Can you share your Dockerfile?

 On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
 wrote:

> thanks all,  it seemed to work but now i am getting a different
> problem, having issues in building pyarrow...
>
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> :36: DeprecationWarning: pkg_resources is deprecated as an API. 
> See https://setuptools.pypa.io/en/latest/pkg_resources.html
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> WARNING setuptools_scm.pyproject_reading toml section missing 
> 'pyproject.toml does not contain a tool.setuptools_scm section'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> Traceback (most recent call last):
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>   File 
> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>  line 36, in read_pyproject
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> section = defn.get("tool", {})[tool_name]
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>   ^^^
> Step #0 - "build-shareloader-template": Step #4 - 

Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-14 Thread Valentyn Tymofieiev via user
I recommend to put all top-level dependencies for your pipeline in setup.py
install_requires section, and autogenerate the requirements.txt, which
would then include all transitive dependencies and ensure reproducible
builds.

For approaches to generate the requirements.txt file from top level
requirements specified in the setup.py file, see:
https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies#optional-update-the-dependencies-in-the-requirements-file-and-rebuild-the-docker-images
.

Valentyn

On Thu, Jun 13, 2024 at 9:52 PM Sofia’s World  wrote:

> Many thanks Hu, worked like a charm
>
> few qq
> so in my reqs.txt i should put all beam requirements PLUS my own?
>
> and in the setup.py, shall i just declare
>
> "apache-beam[gcp]==2.54.0",  # Must match the version in `Dockerfile``.
>
> thanks and kind regards
> Marco
>
>
>
>
>
>
> On Wed, Jun 12, 2024 at 1:48 PM XQ Hu  wrote:
>
>> Any reason to use this?
>>
>> RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
>>  pandas-datareader==0.9.0
>>
>> It is typically recommended to use the latest Beam and build the docker
>> image using the requirements released for each Beam, for example,
>> https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt
>>
>> On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World 
>> wrote:
>>
>>> Sure, apologies, it crossed my mind it would have been useful to refert
>>> to it
>>>
>>> so this is the docker file
>>>
>>>
>>> https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester
>>>
>>> I was using a setup.py as well, but then i commented out the usage in
>>> the dockerfile after checking some flex templates which said it is not
>>> needed
>>>
>>>
>>> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py
>>>
>>> thanks in advance
>>>  Marco
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:
>>>
 Can you share your Dockerfile?

 On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
 wrote:

> thanks all,  it seemed to work but now i am getting a different
> problem, having issues in building pyarrow...
>
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> :36: DeprecationWarning: pkg_resources is deprecated as an API. 
> See https://setuptools.pypa.io/en/latest/pkg_resources.html
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> WARNING setuptools_scm.pyproject_reading toml section missing 
> 'pyproject.toml does not contain a tool.setuptools_scm section'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> Traceback (most recent call last):
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>   File 
> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>  line 36, in read_pyproject
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> section = defn.get("tool", {})[tool_name]
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>   ^^^
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> KeyError: 'setuptools_scm'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> running bdist_wheel
>
>
>
>
> It is somehow getting messed up with a toml ?
>
>
> Could anyone advise?
>
> thanks
>
>  Marco
>
>
>
>
>
> On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user 
> wrote:
>
>>
>> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
>> is a great example.
>>
>> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
>> user@beam.apache.org> wrote:
>>
>>> In this case the Python version will be defined by the Python
>>> version installed in the docker image of your flex template. So, you'd
>>> have to build your flex template from a base image with Python 3.11.
>>>
>>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
>>> wrote:
>>>
 Hello
  no i am running my pipelien on  GCP directly via a flex template,
 configured using a Docker file
 Any chances to do something in the Dockerfile to force the version
 at runtime?
 Thanks

 On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
 user@beam.apache.org> wrote:

> Hello,
>
> Are you running your pipeline from the python 3.11 environment?
> If you are running from a python 3.11 environment and don't use a 
> custom
> docker 

Re: How windowing is implemented on Flink runner

2024-06-14 Thread Wiśniowski Piotr

Hi,

Wanted to follow up as I did have similar case.

So this means it is ok for Beam to use Sliding window of 1 day with 1 
sec period (with using different trigger than after watermark to avoid 
outputting data from every window) and there is no additional 
performance penalty (duplicating input messages for storage or cpu for 
resolving windows)? Interesting from both Flink and Dataflow perspective 
(both Python and Java).


I ended up implementing the logic with Beam state and timers (which is 
quite performant and readable) but also interested in other possibilities.


Best

Wiśniowski Piort

On 12.06.2024 21:50, Ruben Vargas wrote:

I imagined it but wasn't sure!

Thanks for the clarification!

On Wed, Jun 12, 2024 at 1:42 PM Robert Bradshaw via user
 wrote:

Beam implements Windowing itself (via state and timers) rather than
deferring to Flink's implementation.

On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas  wrote:

Hello guys

May be a silly question,

But in the Flink runner, the window implementation uses the Flink
windowing? Does that mean the runner will have performance issues like
Flink itself? see this:
https://issues.apache.org/jira/browse/FLINK-7001

I'm asking because I see the issue, it mentions different concepts
that Beam already handles at the API level. So my suspicion is that
the Beam model handles windowing a little differently from the pure
Flink app. But I'm not sure..


Regards.


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-13 Thread Sofia’s World
Many thanks Hu, worked like a charm

few qq
so in my reqs.txt i should put all beam requirements PLUS my own?

and in the setup.py, shall i just declare

"apache-beam[gcp]==2.54.0",  # Must match the version in `Dockerfile``.

thanks and kind regards
Marco






On Wed, Jun 12, 2024 at 1:48 PM XQ Hu  wrote:

> Any reason to use this?
>
> RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
>  pandas-datareader==0.9.0
>
> It is typically recommended to use the latest Beam and build the docker
> image using the requirements released for each Beam, for example,
> https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt
>
> On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World  wrote:
>
>> Sure, apologies, it crossed my mind it would have been useful to refert
>> to it
>>
>> so this is the docker file
>>
>>
>> https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester
>>
>> I was using a setup.py as well, but then i commented out the usage in the
>> dockerfile after checking some flex templates which said it is not needed
>>
>>
>> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py
>>
>> thanks in advance
>>  Marco
>>
>>
>>
>>
>>
>>
>>
>> On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:
>>
>>> Can you share your Dockerfile?
>>>
>>> On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
>>> wrote:
>>>
 thanks all,  it seemed to work but now i am getting a different
 problem, having issues in building pyarrow...

 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
 :36: DeprecationWarning: pkg_resources is deprecated as an API. 
 See https://setuptools.pypa.io/en/latest/pkg_resources.html
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
 WARNING setuptools_scm.pyproject_reading toml section missing 
 'pyproject.toml does not contain a tool.setuptools_scm section'
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
 Traceback (most recent call last):
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
  File 
 "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
  line 36, in read_pyproject
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
section = defn.get("tool", {})[tool_name]
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
  ^^^
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
 KeyError: 'setuptools_scm'
 Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
 running bdist_wheel




 It is somehow getting messed up with a toml ?


 Could anyone advise?

 thanks

  Marco





 On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user 
 wrote:

>
> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
> is a great example.
>
> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
> user@beam.apache.org> wrote:
>
>> In this case the Python version will be defined by the Python version
>> installed in the docker image of your flex template. So, you'd have to
>> build your flex template from a base image with Python 3.11.
>>
>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
>> wrote:
>>
>>> Hello
>>>  no i am running my pipelien on  GCP directly via a flex template,
>>> configured using a Docker file
>>> Any chances to do something in the Dockerfile to force the version
>>> at runtime?
>>> Thanks
>>>
>>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>>> user@beam.apache.org> wrote:
>>>
 Hello,

 Are you running your pipeline from the python 3.11 environment?  If
 you are running from a python 3.11 environment and don't use a custom
 docker container image, DataflowRunner(Assuming Apache Beam on GCP 
 means
 Apache Beam on DataflowRunner), will use Python 3.11.

 Thanks,
 Anand

>>>


Re: How windowing is implemented on Flink runner

2024-06-12 Thread Ruben Vargas
I imagined it but wasn't sure!

Thanks for the clarification!

On Wed, Jun 12, 2024 at 1:42 PM Robert Bradshaw via user
 wrote:
>
> Beam implements Windowing itself (via state and timers) rather than
> deferring to Flink's implementation.
>
> On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas  wrote:
> >
> > Hello guys
> >
> > May be a silly question,
> >
> > But in the Flink runner, the window implementation uses the Flink
> > windowing? Does that mean the runner will have performance issues like
> > Flink itself? see this:
> > https://issues.apache.org/jira/browse/FLINK-7001
> >
> > I'm asking because I see the issue, it mentions different concepts
> > that Beam already handles at the API level. So my suspicion is that
> > the Beam model handles windowing a little differently from the pure
> > Flink app. But I'm not sure..
> >
> >
> > Regards.


Re: How windowing is implemented on Flink runner

2024-06-12 Thread Robert Bradshaw via user
Beam implements Windowing itself (via state and timers) rather than
deferring to Flink's implementation.

On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas  wrote:
>
> Hello guys
>
> May be a silly question,
>
> But in the Flink runner, the window implementation uses the Flink
> windowing? Does that mean the runner will have performance issues like
> Flink itself? see this:
> https://issues.apache.org/jira/browse/FLINK-7001
>
> I'm asking because I see the issue, it mentions different concepts
> that Beam already handles at the API level. So my suspicion is that
> the Beam model handles windowing a little differently from the pure
> Flink app. But I'm not sure..
>
>
> Regards.


Re: Paralalelism of a side input

2024-06-12 Thread Robert Bradshaw via user
On Wed, Jun 12, 2024 at 7:56 AM Ruben Vargas  wrote:
>
> The approach looks good. but one question
>
> My understanding is that this will schedule for example 8 operators across 
> the workers, but only one of them will be processing, the others remain idle? 
> Are those consuming resources in some way? I'm assuming may be is not 
> significant.

That is correct, but the resources consumed by an idle operator should
be negligible.

> Thanks.
>
> El El vie, 7 de jun de 2024 a la(s) 3:56 p.m., Robert Bradshaw via user 
>  escribió:
>>
>> You can always limit the parallelism by assigning a single key to
>> every element and then doing a grouping or reshuffle[1] on that key
>> before processing the elements. Even if the operator parallelism for
>> that step is technically, say, eight, your effective parallelism will
>> be exactly one.
>>
>> [1] 
>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html
>>
>> On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas  wrote:
>> >
>> > Hello guys
>> >
>> > One question, I have a side input which fetches an endpoint each 30
>> > min, I pretty much copied the example here:
>> > https://beam.apache.org/documentation/patterns/side-inputs/ but added
>> > some logic to fetch the endpoint and parse the payload.
>> >
>> > My question is: it is possible to control the parallelism of this
>> > single ParDo that does the fetch/transform? I don't think I need a lot
>> > of parallelism for that one. I'm currently using flink runner and I
>> > see the parallelism is 8 (which is the general parallelism for my
>> > flink cluster).
>> >
>> > Is it possible to set it to 1 for example?
>> >
>> >
>> > Regards.


Re: Paralalelism of a side input

2024-06-12 Thread Ruben Vargas
The approach looks good. but one question

My understanding is that this will schedule for example 8 operators across
the workers, but only one of them will be processing, the others
remain idle? Are those consuming resources in some way? I'm assuming may be
is not significant.

Thanks.

El El vie, 7 de jun de 2024 a la(s) 3:56 p.m., Robert Bradshaw via user <
user@beam.apache.org> escribió:

> You can always limit the parallelism by assigning a single key to
> every element and then doing a grouping or reshuffle[1] on that key
> before processing the elements. Even if the operator parallelism for
> that step is technically, say, eight, your effective parallelism will
> be exactly one.
>
> [1]
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html
>
> On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas 
> wrote:
> >
> > Hello guys
> >
> > One question, I have a side input which fetches an endpoint each 30
> > min, I pretty much copied the example here:
> > https://beam.apache.org/documentation/patterns/side-inputs/ but added
> > some logic to fetch the endpoint and parse the payload.
> >
> > My question is: it is possible to control the parallelism of this
> > single ParDo that does the fetch/transform? I don't think I need a lot
> > of parallelism for that one. I'm currently using flink runner and I
> > see the parallelism is 8 (which is the general parallelism for my
> > flink cluster).
> >
> > Is it possible to set it to 1 for example?
> >
> >
> > Regards.
>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-12 Thread XQ Hu via user
Any reason to use this?

RUN pip install avro-python3 pyarrow==0.15.1 apache-beam[gcp]==2.30.0
 pandas-datareader==0.9.0

It is typically recommended to use the latest Beam and build the docker
image using the requirements released for each Beam, for example,
https://github.com/apache/beam/blob/release-2.56.0/sdks/python/container/py311/base_image_requirements.txt

On Wed, Jun 12, 2024 at 1:31 AM Sofia’s World  wrote:

> Sure, apologies, it crossed my mind it would have been useful to refert to
> it
>
> so this is the docker file
>
>
> https://github.com/mmistroni/GCP_Experiments/edit/master/dataflow/shareloader/Dockerfile_tester
>
> I was using a setup.py as well, but then i commented out the usage in the
> dockerfile after checking some flex templates which said it is not needed
>
>
> https://github.com/mmistroni/GCP_Experiments/blob/master/dataflow/shareloader/setup_dftester.py
>
> thanks in advance
>  Marco
>
>
>
>
>
>
>
> On Tue, Jun 11, 2024 at 10:54 PM XQ Hu  wrote:
>
>> Can you share your Dockerfile?
>>
>> On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World 
>> wrote:
>>
>>> thanks all,  it seemed to work but now i am getting a different problem,
>>> having issues in building pyarrow...
>>>
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> :36: DeprecationWarning: pkg_resources is deprecated as an API. See 
>>> https://setuptools.pypa.io/en/latest/pkg_resources.html
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> WARNING setuptools_scm.pyproject_reading toml section missing 
>>> 'pyproject.toml does not contain a tool.setuptools_scm section'
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> Traceback (most recent call last):
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
>>> File 
>>> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>>>  line 36, in read_pyproject
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
>>>   section = defn.get("tool", {})[tool_name]
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
>>> ^^^
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> KeyError: 'setuptools_scm'
>>> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>>> running bdist_wheel
>>>
>>>
>>>
>>>
>>> It is somehow getting messed up with a toml ?
>>>
>>>
>>> Could anyone advise?
>>>
>>> thanks
>>>
>>>  Marco
>>>
>>>
>>>
>>>
>>>
>>> On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user 
>>> wrote:
>>>

 https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
 is a great example.

 On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
 user@beam.apache.org> wrote:

> In this case the Python version will be defined by the Python version
> installed in the docker image of your flex template. So, you'd have to
> build your flex template from a base image with Python 3.11.
>
> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
> wrote:
>
>> Hello
>>  no i am running my pipelien on  GCP directly via a flex template,
>> configured using a Docker file
>> Any chances to do something in the Dockerfile to force the version at
>> runtime?
>> Thanks
>>
>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>> user@beam.apache.org> wrote:
>>
>>> Hello,
>>>
>>> Are you running your pipeline from the python 3.11 environment?  If
>>> you are running from a python 3.11 environment and don't use a custom
>>> docker container image, DataflowRunner(Assuming Apache Beam on GCP means
>>> Apache Beam on DataflowRunner), will use Python 3.11.
>>>
>>> Thanks,
>>> Anand
>>>
>>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-11 Thread XQ Hu via user
Can you share your Dockerfile?

On Tue, Jun 11, 2024 at 4:43 PM Sofia’s World  wrote:

> thanks all,  it seemed to work but now i am getting a different problem,
> having issues in building pyarrow...
>
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> :36: DeprecationWarning: pkg_resources is deprecated as an API. See 
> https://setuptools.pypa.io/en/latest/pkg_resources.html
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> WARNING setuptools_scm.pyproject_reading toml section missing 'pyproject.toml 
> does not contain a tool.setuptools_scm section'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> Traceback (most recent call last):
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image": 
> File 
> "/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
>  line 36, in read_pyproject
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> section = defn.get("tool", {})[tool_name]
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
>   ^^^
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> KeyError: 'setuptools_scm'
> Step #0 - "build-shareloader-template": Step #4 - "dftester-image":   
> running bdist_wheel
>
>
>
>
> It is somehow getting messed up with a toml ?
>
>
> Could anyone advise?
>
> thanks
>
>  Marco
>
>
>
>
>
> On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user 
> wrote:
>
>>
>> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
>> is a great example.
>>
>> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
>> user@beam.apache.org> wrote:
>>
>>> In this case the Python version will be defined by the Python version
>>> installed in the docker image of your flex template. So, you'd have to
>>> build your flex template from a base image with Python 3.11.
>>>
>>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
>>> wrote:
>>>
 Hello
  no i am running my pipelien on  GCP directly via a flex template,
 configured using a Docker file
 Any chances to do something in the Dockerfile to force the version at
 runtime?
 Thanks

 On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
 user@beam.apache.org> wrote:

> Hello,
>
> Are you running your pipeline from the python 3.11 environment?  If
> you are running from a python 3.11 environment and don't use a custom
> docker container image, DataflowRunner(Assuming Apache Beam on GCP means
> Apache Beam on DataflowRunner), will use Python 3.11.
>
> Thanks,
> Anand
>



Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-11 Thread Sofia’s World
thanks all,  it seemed to work but now i am getting a different problem,
having issues in building pyarrow...

Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
   :36: DeprecationWarning: pkg_resources is deprecated as an
API. See https://setuptools.pypa.io/en/latest/pkg_resources.html
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
   WARNING setuptools_scm.pyproject_reading toml section missing
'pyproject.toml does not contain a tool.setuptools_scm section'
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
   Traceback (most recent call last):
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
 File 
"/tmp/pip-build-env-meihcxsp/overlay/lib/python3.11/site-packages/setuptools_scm/_integration/pyproject_reading.py",
line 36, in read_pyproject
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
   section = defn.get("tool", {})[tool_name]
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
 ^^^
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
   KeyError: 'setuptools_scm'
Step #0 - "build-shareloader-template": Step #4 - "dftester-image":
   running bdist_wheel




It is somehow getting messed up with a toml ?


Could anyone advise?

thanks

 Marco





On Tue, Jun 11, 2024 at 1:00 AM XQ Hu via user  wrote:

>
> https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
> is a great example.
>
> On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
> user@beam.apache.org> wrote:
>
>> In this case the Python version will be defined by the Python version
>> installed in the docker image of your flex template. So, you'd have to
>> build your flex template from a base image with Python 3.11.
>>
>> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
>> wrote:
>>
>>> Hello
>>>  no i am running my pipelien on  GCP directly via a flex template,
>>> configured using a Docker file
>>> Any chances to do something in the Dockerfile to force the version at
>>> runtime?
>>> Thanks
>>>
>>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>>> user@beam.apache.org> wrote:
>>>
 Hello,

 Are you running your pipeline from the python 3.11 environment?  If you
 are running from a python 3.11 environment and don't use a custom docker
 container image, DataflowRunner(Assuming Apache Beam on GCP means Apache
 Beam on DataflowRunner), will use Python 3.11.

 Thanks,
 Anand

>>>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-10 Thread XQ Hu via user
https://github.com/GoogleCloudPlatform/python-docs-samples/tree/main/dataflow/flex-templates/pipeline_with_dependencies
is a great example.

On Mon, Jun 10, 2024 at 4:28 PM Valentyn Tymofieiev via user <
user@beam.apache.org> wrote:

> In this case the Python version will be defined by the Python version
> installed in the docker image of your flex template. So, you'd have to
> build your flex template from a base image with Python 3.11.
>
> On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World 
> wrote:
>
>> Hello
>>  no i am running my pipelien on  GCP directly via a flex template,
>> configured using a Docker file
>> Any chances to do something in the Dockerfile to force the version at
>> runtime?
>> Thanks
>>
>> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
>> user@beam.apache.org> wrote:
>>
>>> Hello,
>>>
>>> Are you running your pipeline from the python 3.11 environment?  If you
>>> are running from a python 3.11 environment and don't use a custom docker
>>> container image, DataflowRunner(Assuming Apache Beam on GCP means Apache
>>> Beam on DataflowRunner), will use Python 3.11.
>>>
>>> Thanks,
>>> Anand
>>>
>>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-10 Thread Valentyn Tymofieiev via user
In this case the Python version will be defined by the Python version
installed in the docker image of your flex template. So, you'd have to
build your flex template from a base image with Python 3.11.

On Mon, Jun 10, 2024 at 12:50 PM Sofia’s World  wrote:

> Hello
>  no i am running my pipelien on  GCP directly via a flex template,
> configured using a Docker file
> Any chances to do something in the Dockerfile to force the version at
> runtime?
> Thanks
>
> On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user <
> user@beam.apache.org> wrote:
>
>> Hello,
>>
>> Are you running your pipeline from the python 3.11 environment?  If you
>> are running from a python 3.11 environment and don't use a custom docker
>> container image, DataflowRunner(Assuming Apache Beam on GCP means Apache
>> Beam on DataflowRunner), will use Python 3.11.
>>
>> Thanks,
>> Anand
>>
>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-10 Thread Sofia’s World
Hello
 no i am running my pipelien on  GCP directly via a flex template,
configured using a Docker file
Any chances to do something in the Dockerfile to force the version at
runtime?
Thanks

On Mon, Jun 10, 2024 at 7:24 PM Anand Inguva via user 
wrote:

> Hello,
>
> Are you running your pipeline from the python 3.11 environment?  If you
> are running from a python 3.11 environment and don't use a custom docker
> container image, DataflowRunner(Assuming Apache Beam on GCP means Apache
> Beam on DataflowRunner), will use Python 3.11.
>
> Thanks,
> Anand
>


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-10 Thread Anand Inguva via user
Hello,

Are you running your pipeline from the python 3.11 environment?  If you are
running from a python 3.11 environment and don't use a custom docker
container image, DataflowRunner(Assuming Apache Beam on GCP means Apache
Beam on DataflowRunner), will use Python 3.11.

Thanks,
Anand


Re: Apache Bean on GCP / Forcing to use py 3.11

2024-06-10 Thread Ahmet Altay via user
If you could use py 3.11 locally, you will get python 3.11 in your cloud
environment as well. Is that not happening?

When you run Apache Beam on GCP, the python version you are using in your
local virtual environment will be used in the cloud environment as well. I
believe this is true for non-GCP environments as well.


On Mon, Jun 10, 2024 at 11:08 AM Sofia’s World  wrote:

> Hello
>  sorry for the partially off topic question
> I am running a pipeline in which one of hte dependencies need to run on py
> 3.11
> But i dont see any options that allow me to force the python version to be
> used
>
> Could anyone help?
> Kind regards
> Marco
>


Re: Beam + VertexAI

2024-06-09 Thread XQ Hu via user
If you have a Vertex AI model, try
https://cloud.google.com/dataflow/docs/notebooks/run_inference_vertex_ai
If you want to use the Vertex AI model to do text embedding, try
https://cloud.google.com/dataflow/docs/notebooks/vertex_ai_text_embeddings

On Sun, Jun 9, 2024 at 4:40 AM Sofia’s World  wrote:

> HI all
>  i am looking for samples of integrating VertexAI into apache beam..
> As sample, i want to create a pipeline that retrieves some news
> information and will invoke
> VertexAI to summarize the main point of every news...
>
> Could you anyone give me some pointers?
> Kind regards
>  marco
>


Re: Paralalelism of a side input

2024-06-07 Thread Robert Bradshaw via user
You can always limit the parallelism by assigning a single key to
every element and then doing a grouping or reshuffle[1] on that key
before processing the elements. Even if the operator parallelism for
that step is technically, say, eight, your effective parallelism will
be exactly one.

[1] 
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html

On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas  wrote:
>
> Hello guys
>
> One question, I have a side input which fetches an endpoint each 30
> min, I pretty much copied the example here:
> https://beam.apache.org/documentation/patterns/side-inputs/ but added
> some logic to fetch the endpoint and parse the payload.
>
> My question is: it is possible to control the parallelism of this
> single ParDo that does the fetch/transform? I don't think I need a lot
> of parallelism for that one. I'm currently using flink runner and I
> see the parallelism is 8 (which is the general parallelism for my
> flink cluster).
>
> Is it possible to set it to 1 for example?
>
>
> Regards.


Re: Question: Pipelines Stuck with Java 21 and BigQuery Storage Write API

2024-06-07 Thread Yi Hu via user
Hi,

Which runner are you using? If you are running on Dataflow runner, then
refer to this [1] and add
"--jdkAddOpenModules=java.base/java.lang=ALL-UNNAMED" to pipeline option.
If using direct runner, then add
"--add-opens=java.base/java.lang=ALL-UNNAMED" to JVM invocation command
line.

The same enforcement was introduced in both Java17 and 21, and it is
strange that Java17 worked without the option but Java21 didn't. Are you
testing on the same beam version and other configurations? Also, more
recent beam versions eliminated most usage of
"ClassLoadingStrategy.Default.INJECTION"
that cause this pipeline option being required, e.g. [2]. Try the latest
beam version 2.56.0 and this option may not be needed.

[1]
https://beam.apache.org/releases/javadoc/current/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.html#getJdkAddOpenModules--

[2] https://github.com/apache/beam/pull/30367



On Mon, Jun 3, 2024 at 7:14 PM XQ Hu  wrote:

> Probably related to the strict encapsulation that is enforced with Java
> 21.
> Use `--add-opens=java.base/java.lang=ALL-UNNAMED` as the JVM flag could be
> a temporary workaround.
>
> On Mon, Jun 3, 2024 at 3:04 AM 田中万葉  wrote:
>
>> Hi all,
>>
>> I encountered an UnsupportedOperationException when using Java 21 and the
>> BigQuery Storage Write API in a Beam pipeline by using
>> ".withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API));"
>>
>> Having read issue #28120[1] and understanding that Beam version 2.52.0 or
>> later supports Java 21 as a runtime, I wonder why such an error happens.
>>
>> I found there are two workarounds, but the Storage Write API is a more
>> preferable way to insert data into BigQuery, so I'd like to find a
>> solution.
>>
>> 1. One workaround is to switch from Java 21 to Java 17(openjdk version
>> "17.0.10" 2024-01-16). By changing the  and
>>  in the pom.xml file (i.e., without modifying
>> App.java itself), the pipeline successfully writes data to my destination
>> table on BigQuery. It seems Java 17 and BigQuery Storage Write API works
>> fine.
>> 2. The other workaround is to change insert method. I tried the BigQuery
>> legacy streaming API(
>> https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery )
>> instead of the Storage Write API. Even though I still used Java 21, when I
>> changed my code to
>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS));, I did not
>> encounter the error.
>>
>> So I faced the error only when using Java 21 and BigQuery Storage Write
>> API.
>>
>> I uploaded the code below to reproduce. Could you please inform me how to
>> handle this issue?
>> https://github.com/cloud-ace/min-reproduce
>>
>> My Environment
>> - OS
>>   - Ubuntu 22.04
>>   - Mac OS Sonoma(14.3.1)
>> - beam 2.53.0, 2.54.0
>> - openjdk version "21.0.2" 2024-01-16
>> - maven 3.9.6
>> - DirectRunner
>>
>> Thanks,
>>
>> Kazuha
>>
>> [1]: https://github.com/apache/beam/issues/28120
>>
>> Here is the detailed error message.
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.UnsupportedOperationException: Cannot define class using
>> reflection: Unable to make protected java.lang.Package
>> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
>> java.base does not "opens java.lang" to unnamed module @116d5dff
>>
>> Caused by: java.lang.UnsupportedOperationException: Cannot define class
>> using reflection: Unable to make protected java.lang.Package
>> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
>> java.base does not "opens java.lang" to unnamed module @116d5dff
>> at
>> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection$Dispatcher$Initializable$Unavailable.defineClass
>> (ClassInjector.java:472)
>> at
>> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection.injectRaw
>> (ClassInjector.java:284)
>> at net.bytebuddy.dynamic.loading.ClassInjector$AbstractBase.inject
>> (ClassInjector.java:118)
>> at
>> net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default$InjectionDispatcher.load
>> (ClassLoadingStrategy.java:241)
>> at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default.load
>> (ClassLoadingStrategy.java:148)
>> at net.bytebuddy.dynamic.TypeResolutionStrategy$Passive.initialize
>> (TypeResolutionStrategy.java:101)
>> at net.bytebuddy.dynamic.DynamicType$Default$Unloaded.load
>> (DynamicType.java:6317)
>> at
>> org.apache.beam.sdk.schemas.utils.AutoValueUtils.createBuilderCreator
>> (AutoValueUtils.java:247)
>> at org.apache.beam.sdk.schemas.utils.AutoValueUtils.getBuilderCreator
>> (AutoValueUtils.java:225)
>> at org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator
>> (AutoValueSchema.java:122)
>> at org.apache.beam.sdk.schemas.CachingFactory.create
>> (CachingFactory.java:56)
>> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
>> (FromRowUsingCreator.java:94)
>> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
>> (FromRowUsingCreator.java:45)
>> at 

Re: Question: Pipelines Stuck with Java 21 and BigQuery Storage Write API

2024-06-06 Thread 田中万葉
Hi, XQ
Thank you for your reply.

I tried to implement your suggestion by running `mvn compile exec:java
-Dexec.args="--add-opens=java.base/java.lang=ALL-UNNAMED"` or by editing a
plugin in the pom.xml as shown below.
However, I was unable to resolve this issue. As I am very new to Beam and
the OSS community, I am unsure of what the next step should be. Should I
create an issue on Github, or could you point out where I might be missing
something?

I appreciate your help.

Best regards,

Kazuha

```pom.xml
  
org.codehaus.mojo
exec-maven-plugin
3.0.0

  

  java

  


  com.example.MainClass
  

-Dexec.args="--add-opens=java.base/java.lang=ALL-UNNAMED"
  

  
```

On Tue, Jun 4, 2024 at 8:14 AM XQ Hu via user  wrote:

> Probably related to the strict encapsulation that is enforced with Java
> 21.
> Use `--add-opens=java.base/java.lang=ALL-UNNAMED` as the JVM flag could be
> a temporary workaround.
>
> On Mon, Jun 3, 2024 at 3:04 AM 田中万葉  wrote:
>
>> Hi all,
>>
>> I encountered an UnsupportedOperationException when using Java 21 and the
>> BigQuery Storage Write API in a Beam pipeline by using
>> ".withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API));"
>>
>> Having read issue #28120[1] and understanding that Beam version 2.52.0 or
>> later supports Java 21 as a runtime, I wonder why such an error happens.
>>
>> I found there are two workarounds, but the Storage Write API is a more
>> preferable way to insert data into BigQuery, so I'd like to find a
>> solution.
>>
>> 1. One workaround is to switch from Java 21 to Java 17(openjdk version
>> "17.0.10" 2024-01-16). By changing the  and
>>  in the pom.xml file (i.e., without modifying
>> App.java itself), the pipeline successfully writes data to my destination
>> table on BigQuery. It seems Java 17 and BigQuery Storage Write API works
>> fine.
>> 2. The other workaround is to change insert method. I tried the BigQuery
>> legacy streaming API(
>> https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery )
>> instead of the Storage Write API. Even though I still used Java 21, when I
>> changed my code to
>> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS));, I did not
>> encounter the error.
>>
>> So I faced the error only when using Java 21 and BigQuery Storage Write
>> API.
>>
>> I uploaded the code below to reproduce. Could you please inform me how to
>> handle this issue?
>> https://github.com/cloud-ace/min-reproduce
>>
>> My Environment
>> - OS
>>   - Ubuntu 22.04
>>   - Mac OS Sonoma(14.3.1)
>> - beam 2.53.0, 2.54.0
>> - openjdk version "21.0.2" 2024-01-16
>> - maven 3.9.6
>> - DirectRunner
>>
>> Thanks,
>>
>> Kazuha
>>
>> [1]: https://github.com/apache/beam/issues/28120
>>
>> Here is the detailed error message.
>>
>> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
>> java.lang.UnsupportedOperationException: Cannot define class using
>> reflection: Unable to make protected java.lang.Package
>> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
>> java.base does not "opens java.lang" to unnamed module @116d5dff
>>
>> Caused by: java.lang.UnsupportedOperationException: Cannot define class
>> using reflection: Unable to make protected java.lang.Package
>> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
>> java.base does not "opens java.lang" to unnamed module @116d5dff
>> at
>> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection$Dispatcher$Initializable$Unavailable.defineClass
>> (ClassInjector.java:472)
>> at
>> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection.injectRaw
>> (ClassInjector.java:284)
>> at net.bytebuddy.dynamic.loading.ClassInjector$AbstractBase.inject
>> (ClassInjector.java:118)
>> at
>> net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default$InjectionDispatcher.load
>> (ClassLoadingStrategy.java:241)
>> at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default.load
>> (ClassLoadingStrategy.java:148)
>> at net.bytebuddy.dynamic.TypeResolutionStrategy$Passive.initialize
>> (TypeResolutionStrategy.java:101)
>> at net.bytebuddy.dynamic.DynamicType$Default$Unloaded.load
>> (DynamicType.java:6317)
>> at
>> org.apache.beam.sdk.schemas.utils.AutoValueUtils.createBuilderCreator
>> (AutoValueUtils.java:247)
>> at org.apache.beam.sdk.schemas.utils.AutoValueUtils.getBuilderCreator
>> (AutoValueUtils.java:225)
>> at org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator
>> (AutoValueSchema.java:122)
>> at org.apache.beam.sdk.schemas.CachingFactory.create
>> (CachingFactory.java:56)
>> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
>> (FromRowUsingCreator.java:94)
>> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
>> (FromRowUsingCreator.java:45)
>> at org.apache.beam.sdk.schemas.SchemaCoder.decode
>> (SchemaCoder.java:126)
>> at 

Re: Question: Pipelines Stuck with Java 21 and BigQuery Storage Write API

2024-06-03 Thread XQ Hu via user
Probably related to the strict encapsulation that is enforced with Java 21.
Use `--add-opens=java.base/java.lang=ALL-UNNAMED` as the JVM flag could be
a temporary workaround.

On Mon, Jun 3, 2024 at 3:04 AM 田中万葉  wrote:

> Hi all,
>
> I encountered an UnsupportedOperationException when using Java 21 and the
> BigQuery Storage Write API in a Beam pipeline by using
> ".withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API));"
>
> Having read issue #28120[1] and understanding that Beam version 2.52.0 or
> later supports Java 21 as a runtime, I wonder why such an error happens.
>
> I found there are two workarounds, but the Storage Write API is a more
> preferable way to insert data into BigQuery, so I'd like to find a
> solution.
>
> 1. One workaround is to switch from Java 21 to Java 17(openjdk version
> "17.0.10" 2024-01-16). By changing the  and
>  in the pom.xml file (i.e., without modifying
> App.java itself), the pipeline successfully writes data to my destination
> table on BigQuery. It seems Java 17 and BigQuery Storage Write API works
> fine.
> 2. The other workaround is to change insert method. I tried the BigQuery
> legacy streaming API(
> https://cloud.google.com/bigquery/docs/streaming-data-into-bigquery )
> instead of the Storage Write API. Even though I still used Java 21, when I
> changed my code to
> .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS));, I did not
> encounter the error.
>
> So I faced the error only when using Java 21 and BigQuery Storage Write
> API.
>
> I uploaded the code below to reproduce. Could you please inform me how to
> handle this issue?
> https://github.com/cloud-ace/min-reproduce
>
> My Environment
> - OS
>   - Ubuntu 22.04
>   - Mac OS Sonoma(14.3.1)
> - beam 2.53.0, 2.54.0
> - openjdk version "21.0.2" 2024-01-16
> - maven 3.9.6
> - DirectRunner
>
> Thanks,
>
> Kazuha
>
> [1]: https://github.com/apache/beam/issues/28120
>
> Here is the detailed error message.
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.UnsupportedOperationException: Cannot define class using
> reflection: Unable to make protected java.lang.Package
> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
> java.base does not "opens java.lang" to unnamed module @116d5dff
>
> Caused by: java.lang.UnsupportedOperationException: Cannot define class
> using reflection: Unable to make protected java.lang.Package
> java.lang.ClassLoader.getPackage(java.lang.String) accessible: module
> java.base does not "opens java.lang" to unnamed module @116d5dff
> at
> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection$Dispatcher$Initializable$Unavailable.defineClass
> (ClassInjector.java:472)
> at
> net.bytebuddy.dynamic.loading.ClassInjector$UsingReflection.injectRaw
> (ClassInjector.java:284)
> at net.bytebuddy.dynamic.loading.ClassInjector$AbstractBase.inject
> (ClassInjector.java:118)
> at
> net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default$InjectionDispatcher.load
> (ClassLoadingStrategy.java:241)
> at net.bytebuddy.dynamic.loading.ClassLoadingStrategy$Default.load
> (ClassLoadingStrategy.java:148)
> at net.bytebuddy.dynamic.TypeResolutionStrategy$Passive.initialize
> (TypeResolutionStrategy.java:101)
> at net.bytebuddy.dynamic.DynamicType$Default$Unloaded.load
> (DynamicType.java:6317)
> at
> org.apache.beam.sdk.schemas.utils.AutoValueUtils.createBuilderCreator
> (AutoValueUtils.java:247)
> at org.apache.beam.sdk.schemas.utils.AutoValueUtils.getBuilderCreator
> (AutoValueUtils.java:225)
> at org.apache.beam.sdk.schemas.AutoValueSchema.schemaTypeCreator
> (AutoValueSchema.java:122)
> at org.apache.beam.sdk.schemas.CachingFactory.create
> (CachingFactory.java:56)
> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
> (FromRowUsingCreator.java:94)
> at org.apache.beam.sdk.schemas.FromRowUsingCreator.apply
> (FromRowUsingCreator.java:45)
> at org.apache.beam.sdk.schemas.SchemaCoder.decode
> (SchemaCoder.java:126)
> at org.apache.beam.sdk.coders.Coder.decode (Coder.java:159)
> at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:84)
> at org.apache.beam.sdk.coders.KvCoder.decode (KvCoder.java:37)
> at org.apache.beam.sdk.util.CoderUtils.decodeFromSafeStream
> (CoderUtils.java:142)
> at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray
> (CoderUtils.java:102)
> at org.apache.beam.sdk.util.CoderUtils.decodeFromByteArray
> (CoderUtils.java:96)
> at org.apache.beam.sdk.util.CoderUtils.clone (CoderUtils.java:168)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.
> (MutationDetectors.java:118)
> at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder
> (MutationDetectors.java:49)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add
> (ImmutabilityCheckingBundleFactory.java:115)
> at
> 

Re: Query about autinference of numPartitions for `JdbcIO#readWithPartitions`

2024-05-31 Thread XQ Hu via user
You should be able to configure the number of partition like this:

https://github.com/GoogleCloudPlatform/dataflow-cookbook/blob/main/Java/src/main/java/jdbc/ReadPartitionsJdbc.java#L132

The code to  auto infer the number of partitions seems to be unreachable (I
haven't checked this carefully). More details are here:
https://issues.apache.org/jira/browse/BEAM-12456

On Fri, May 31, 2024 at 7:40 AM Vardhan Thigle via user <
user@beam.apache.org> wrote:

> Hi Beam Experts,I have a small query about `JdbcIO#readWithPartitions`
>
>
> ContextJdbcIO#readWithPartitions seems to always default
> to 200 partitions (DEFAULT_NUM_PARTITIONS). This is set by default when the
> object is constructed here
> 
> There seems to be no way to override this with a null value. Hence it
> seems that, the code
> 
>  that
> checks the null value and tries to auto infer the number of partitions
> based on the never runs.I am trying to use this for reading a tall table
> of unknown size, and the pipeline always defaults to 200 if the value is
> not set.  The default of 200 seems to fall short as worker goes out of
> memory in reshuffle stage. Running with higher number of partitions like 4K
> helps for my test setup.Since the size is not known at the time of
> implementing the pipeline, the auto-inference might help
> setting maxPartitions to a reasonable value as per the heuristic decided by
> Beam code.
> Request for help
>
> Could you please clarify a few doubts around this?
>
>1. Is this behavior intentional?
>2. Could you please explain the rationale behind the heuristic in L1398
>
> 
> and DEFAULT_NUM_PARTITIONS=200?
>
>
> I have also raised this as issues/31467 incase it needs any changes in
> the implementation.
>
>
> Regards and Thanks,
> Vardhan Thigle,
> +919535346204 <+91%2095353%2046204>
>


Re: Error handling for GCP Pub/Sub on Dataflow using Python

2024-05-25 Thread XQ Hu via user
I do not suggest you handle this in beam.io.WriteToPubSub. You could change
your pipeline to add one transform to check the message size. If it is
beyond 10 MB, you could use another sink or process the message to reduce
the size.

On Fri, May 24, 2024 at 3:46 AM Nimrod Shory  wrote:

> Hello group,
> I am pretty new to Dataflow and Beam.
> I have deployed a Dataflow streaming job using Beam with Python.
> The final step of my pipeline is publishing a message to Pub/Sub.
> In certain cases the message can become too big for Pub/Sub (larger than
> the allowed 10MB) and in that case of failure, it just retries to publish
> indefinitely, causing the Job to eventually stop processing new data.
>
> My question is, is there a way to handle failures in beam.io.WriteToPubSub
> or should I implement a similar method myself?
>
> Ideally, I would like to write the too large messages to a file on cloud
> storage.
>
> Any ideas will be appreciated.
>
> Thanks in advance for your help!
>
>


Re: Question: Java Apache Beam, mock external Clients initialized in Setup

2024-05-25 Thread XQ Hu via user
I am not sure which part you want to test. If the processData part should
be tested, you could refactor the code without use any Beam specific code
and test the processing data logic.

>From your example, it seems that you are calling some APIs, we recently
added a new Web API IO:
https://beam.apache.org/documentation/io/built-in/webapis/,
which provides a way to test.

On Wed, May 22, 2024 at 5:06 PM Ritwik Dutta via dev 
wrote:

> any response yet? No one has answers? I left a stackoverflow bounty on the
> question
>
> Using external methods is pretty important
>
> On Sunday, May 12, 2024 at 11:52:25 AM PDT, Ritwik Dutta <
> rdutt...@yahoo.com> wrote:
>
>
> Hi,
> I wrote the following question here.
> It would be really helpful also, if you can also update your documentation
> on Using Test Fakes in different Situations. It was very light
> documentation. Please provide more explanation and examples.
> https://beam.apache.org/documentation/io/testing/#:~:text=non%2DBeam%20client.-,Use%20fakes,-Instead%20of%20using
>
>
> *Question: *Java Apache Beam, mock external Clients initialized in @Setup
> method of DoFn with Constructors variables
>
> https://stackoverflow.com/questions/78468953/java-apache-beam-mock-external-clients-initialized-in-setup-method-of-dofn-wit
>
> Thanks,
>
> -Ritwik Dutta
>  734-262-4285 <(734)%20262-4285>
>


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-21 Thread Yarden BenMoshe
I agree it looks very similar, will continue to look at it with the help of
this issue.
Thanks, Jan!


Does the description in [1] match your case?
>
> [1] https://github.com/apache/beam/issues/31085#issuecomment-2115304242
> On 5/19/24 10:07, Yarden BenMoshe wrote:
>
> I am not running my pipeline from command-line, so used instead:
> options.setExperiments(Collections.singletonList("use_deprecated_read"));
>
> with ExperimentalOptions added to my options interface, however I dont
> think there's any effect to using it. in terms of the watermark, i received
> again:
> WatermarkHold.addHolds: element hold at 2024-05-19T07:52:59.999Z is on
> time for key:aaa-bbb-ccc;
> window:[2024-05-19T07:52:00.000Z..2024-05-19T07:53:00.000Z);
> inputWatermark:-290308-12-21T19:59:05.225Z;
> outputWatermark:-290308-12-21T19:59:05.225Z
>
>
>
> ‫בתאריך יום ה׳, 16 במאי 2024 ב-17:06 מאת ‪Jan Lukavský‬‏ <‪je...@seznam.cz
> ‬‏>:‬
>
>> Does using --experiments=use_deprecated_read have any effect?
>> On 5/16/24 14:30, Yarden BenMoshe wrote:
>>
>> Hi Jan, my PipelineOptions is as follows:
>> options.setStreaming(true);
>> options.setAttachedMode(false);
>> options.setRunner(FlinkRunner.class);
>>
>> I've also tried adding:
>> options.setAutoWatermarkInterval(100L);
>> as seen in some github issue, without any success so far.
>>
>> other than that, i am working with parallelism:3 and number of task
>> slots: 3
>>
>> Thanks!
>> Yarden
>>
>> ‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏ <‪
>> je...@seznam.cz‬‏>:‬
>>
>>> Hi Yarden,
>>>
>>> can you please provide all flink-related PipelineOptions you use for the
>>> job?
>>>
>>>   Jan
>>>
>>> On 5/16/24 13:44, Yarden BenMoshe wrote:
>>> > Hi all,
>>> > I have a project running with Beam 2.51, using Flink runner. In one of
>>> > my pipelines i have a FixedWindow and had a problem upgrading until
>>> > now, with a timers issue now resolved, and hopefully allowing me to
>>> > upgrade to version 2.56
>>> > However, I encounter another problem now which I believe is related to
>>> > watermarking(?).
>>> > My pipeline's source is a kafka topic.
>>> > My basic window definition is:
>>> >
>>> > PCollection>> windowCustomObjectInfo
>>> > = customObject.apply("windowCustomObjectInfo",
>>> >
>>> Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
>>> >
>>> > and ever since upgrading to version 2.56 I am not getting any output
>>> > from that window. when enabling TRACE logs, i have this message:
>>> >
>>> > 2024-05-12 13:50:55,257 TRACE org.apache.beam.sdk.util.WindowTracing
>>> > [] - WatermarkHold.addHolds: element hold at 2024-05-12T13:50:59.999Z
>>> > is on time for key:test-12345;
>>> > window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
>>> > inputWatermark:-290308-12-21T19:59:05.225Z;
>>> > outputWatermark:-290308-12-21T19:59:05.225Z
>>> >
>>> >
>>> > Any hints on where should I look or maybe how I can adjust my window
>>> > definition? Are you familiar with any change that might be the cause
>>> > for my issue?
>>> > Thanks
>>>
>>


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-21 Thread Jan Lukavský

Does the description in [1] match your case?

[1] https://github.com/apache/beam/issues/31085#issuecomment-2115304242

On 5/19/24 10:07, Yarden BenMoshe wrote:

I am not running my pipeline from command-line, so used instead:
options.setExperiments(Collections.singletonList("use_deprecated_read"));

with ExperimentalOptions added to my options interface, however I dont 
think there's any effect to using it. in terms of the watermark, i 
received again:
WatermarkHold.addHolds: element hold at 2024-05-19T07:52:59.999Z is on 
time for key:aaa-bbb-ccc; 
window:[2024-05-19T07:52:00.000Z..2024-05-19T07:53:00.000Z); 
inputWatermark:-290308-12-21T19:59:05.225Z; 
outputWatermark:-290308-12-21T19:59:05.225Z




‫בתאריך יום ה׳, 16 במאי 2024 ב-17:06 מאת ‪Jan Lukavský‬‏ 
<‪je...@seznam.cz‬‏>:‬


Does using --experiments=use_deprecated_read have any effect?

On 5/16/24 14:30, Yarden BenMoshe wrote:

Hi Jan, my PipelineOptions is as follows:
options.setStreaming(true);
options.setAttachedMode(false);
options.setRunner(FlinkRunner.class);

I've also tried adding:
options.setAutoWatermarkInterval(100L);
as seen in some github issue, without any success so far.

other than that, i am working with parallelism:3 and number of
task slots: 3

Thanks!
Yarden

‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏
<‪je...@seznam.cz‬‏>:‬

Hi Yarden,

can you please provide all flink-related PipelineOptions you
use for the
job?

  Jan

On 5/16/24 13:44, Yarden BenMoshe wrote:
> Hi all,
> I have a project running with Beam 2.51, using Flink
runner. In one of
> my pipelines i have a FixedWindow and had a problem
upgrading until
> now, with a timers issue now resolved, and hopefully
allowing me to
> upgrade to version 2.56
> However, I encounter another problem now which I believe is
related to
> watermarking(?).
> My pipeline's source is a kafka topic.
> My basic window definition is:
>
> PCollection>>
windowCustomObjectInfo
> = customObject.apply("windowCustomObjectInfo",
>

Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
>
> and ever since upgrading to version 2.56 I am not getting
any output
> from that window. when enabling TRACE logs, i have this
message:
>
> 2024-05-12 13:50:55,257 TRACE
org.apache.beam.sdk.util.WindowTracing
> [] - WatermarkHold.addHolds: element hold at
2024-05-12T13:50:59.999Z
> is on time for key:test-12345;
> window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
> inputWatermark:-290308-12-21T19:59:05.225Z;
> outputWatermark:-290308-12-21T19:59:05.225Z
>
>
> Any hints on where should I look or maybe how I can adjust
my window
> definition? Are you familiar with any change that might be
the cause
> for my issue?
> Thanks


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-19 Thread Yarden BenMoshe
I am not running my pipeline from command-line, so used instead:
options.setExperiments(Collections.singletonList("use_deprecated_read"));

with ExperimentalOptions added to my options interface, however I dont
think there's any effect to using it. in terms of the watermark, i received
again:
WatermarkHold.addHolds: element hold at 2024-05-19T07:52:59.999Z is on time
for key:aaa-bbb-ccc;
window:[2024-05-19T07:52:00.000Z..2024-05-19T07:53:00.000Z);
inputWatermark:-290308-12-21T19:59:05.225Z;
outputWatermark:-290308-12-21T19:59:05.225Z



‫בתאריך יום ה׳, 16 במאי 2024 ב-17:06 מאת ‪Jan Lukavský‬‏ <‪je...@seznam.cz
‬‏>:‬

> Does using --experiments=use_deprecated_read have any effect?
> On 5/16/24 14:30, Yarden BenMoshe wrote:
>
> Hi Jan, my PipelineOptions is as follows:
> options.setStreaming(true);
> options.setAttachedMode(false);
> options.setRunner(FlinkRunner.class);
>
> I've also tried adding:
> options.setAutoWatermarkInterval(100L);
> as seen in some github issue, without any success so far.
>
> other than that, i am working with parallelism:3 and number of task
> slots: 3
>
> Thanks!
> Yarden
>
> ‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏ <‪je...@seznam.cz
> ‬‏>:‬
>
>> Hi Yarden,
>>
>> can you please provide all flink-related PipelineOptions you use for the
>> job?
>>
>>   Jan
>>
>> On 5/16/24 13:44, Yarden BenMoshe wrote:
>> > Hi all,
>> > I have a project running with Beam 2.51, using Flink runner. In one of
>> > my pipelines i have a FixedWindow and had a problem upgrading until
>> > now, with a timers issue now resolved, and hopefully allowing me to
>> > upgrade to version 2.56
>> > However, I encounter another problem now which I believe is related to
>> > watermarking(?).
>> > My pipeline's source is a kafka topic.
>> > My basic window definition is:
>> >
>> > PCollection>> windowCustomObjectInfo
>> > = customObject.apply("windowCustomObjectInfo",
>> >
>> Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
>> >
>> > and ever since upgrading to version 2.56 I am not getting any output
>> > from that window. when enabling TRACE logs, i have this message:
>> >
>> > 2024-05-12 13:50:55,257 TRACE org.apache.beam.sdk.util.WindowTracing
>> > [] - WatermarkHold.addHolds: element hold at 2024-05-12T13:50:59.999Z
>> > is on time for key:test-12345;
>> > window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
>> > inputWatermark:-290308-12-21T19:59:05.225Z;
>> > outputWatermark:-290308-12-21T19:59:05.225Z
>> >
>> >
>> > Any hints on where should I look or maybe how I can adjust my window
>> > definition? Are you familiar with any change that might be the cause
>> > for my issue?
>> > Thanks
>>
>


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-16 Thread Jan Lukavský

Does using --experiments=use_deprecated_read have any effect?

On 5/16/24 14:30, Yarden BenMoshe wrote:

Hi Jan, my PipelineOptions is as follows:
options.setStreaming(true);
options.setAttachedMode(false);
options.setRunner(FlinkRunner.class);

I've also tried adding:
options.setAutoWatermarkInterval(100L);
as seen in some github issue, without any success so far.

other than that, i am working with parallelism:3 and number of task 
slots: 3


Thanks!
Yarden

‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏ 
<‪je...@seznam.cz‬‏>:‬


Hi Yarden,

can you please provide all flink-related PipelineOptions you use
for the
job?

  Jan

On 5/16/24 13:44, Yarden BenMoshe wrote:
> Hi all,
> I have a project running with Beam 2.51, using Flink runner. In
one of
> my pipelines i have a FixedWindow and had a problem upgrading until
> now, with a timers issue now resolved, and hopefully allowing me to
> upgrade to version 2.56
> However, I encounter another problem now which I believe is
related to
> watermarking(?).
> My pipeline's source is a kafka topic.
> My basic window definition is:
>
> PCollection>>
windowCustomObjectInfo
> = customObject.apply("windowCustomObjectInfo",
>

Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
>
> and ever since upgrading to version 2.56 I am not getting any
output
> from that window. when enabling TRACE logs, i have this message:
>
> 2024-05-12 13:50:55,257 TRACE
org.apache.beam.sdk.util.WindowTracing
> [] - WatermarkHold.addHolds: element hold at
2024-05-12T13:50:59.999Z
> is on time for key:test-12345;
> window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
> inputWatermark:-290308-12-21T19:59:05.225Z;
> outputWatermark:-290308-12-21T19:59:05.225Z
>
>
> Any hints on where should I look or maybe how I can adjust my
window
> definition? Are you familiar with any change that might be the
cause
> for my issue?
> Thanks


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-16 Thread Yarden BenMoshe
Hi Jan, my PipelineOptions is as follows:
options.setStreaming(true);
options.setAttachedMode(false);
options.setRunner(FlinkRunner.class);

I've also tried adding:
options.setAutoWatermarkInterval(100L);
as seen in some github issue, without any success so far.

other than that, i am working with parallelism:3 and number of task slots: 3

Thanks!
Yarden

‫בתאריך יום ה׳, 16 במאי 2024 ב-15:05 מאת ‪Jan Lukavský‬‏ <‪je...@seznam.cz
‬‏>:‬

> Hi Yarden,
>
> can you please provide all flink-related PipelineOptions you use for the
> job?
>
>   Jan
>
> On 5/16/24 13:44, Yarden BenMoshe wrote:
> > Hi all,
> > I have a project running with Beam 2.51, using Flink runner. In one of
> > my pipelines i have a FixedWindow and had a problem upgrading until
> > now, with a timers issue now resolved, and hopefully allowing me to
> > upgrade to version 2.56
> > However, I encounter another problem now which I believe is related to
> > watermarking(?).
> > My pipeline's source is a kafka topic.
> > My basic window definition is:
> >
> > PCollection>> windowCustomObjectInfo
> > = customObject.apply("windowCustomObjectInfo",
> >
> Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());
> >
> > and ever since upgrading to version 2.56 I am not getting any output
> > from that window. when enabling TRACE logs, i have this message:
> >
> > 2024-05-12 13:50:55,257 TRACE org.apache.beam.sdk.util.WindowTracing
> > [] - WatermarkHold.addHolds: element hold at 2024-05-12T13:50:59.999Z
> > is on time for key:test-12345;
> > window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z);
> > inputWatermark:-290308-12-21T19:59:05.225Z;
> > outputWatermark:-290308-12-21T19:59:05.225Z
> >
> >
> > Any hints on where should I look or maybe how I can adjust my window
> > definition? Are you familiar with any change that might be the cause
> > for my issue?
> > Thanks
>


Re: KafkaIO/FixedWindow changes 2.56?

2024-05-16 Thread Jan Lukavský

Hi Yarden,

can you please provide all flink-related PipelineOptions you use for the 
job?


 Jan

On 5/16/24 13:44, Yarden BenMoshe wrote:

Hi all,
I have a project running with Beam 2.51, using Flink runner. In one of 
my pipelines i have a FixedWindow and had a problem upgrading until 
now, with a timers issue now resolved, and hopefully allowing me to 
upgrade to version 2.56
However, I encounter another problem now which I believe is related to 
watermarking(?).

My pipeline's source is a kafka topic.
My basic window definition is:

PCollection>> windowCustomObjectInfo 
= customObject.apply("windowCustomObjectInfo", 
Window.into(FixedWindows.of(Duration.standardSeconds(60.apply(GroupByKey.create());


and ever since upgrading to version 2.56 I am not getting any output 
from that window. when enabling TRACE logs, i have this message:


2024-05-12 13:50:55,257 TRACE org.apache.beam.sdk.util.WindowTracing 
[] - WatermarkHold.addHolds: element hold at 2024-05-12T13:50:59.999Z 
is on time for key:test-12345; 
window:[2024-05-12T13:50:00.000Z..2024-05-12T13:51:00.000Z); 
inputWatermark:-290308-12-21T19:59:05.225Z; 
outputWatermark:-290308-12-21T19:59:05.225Z



Any hints on where should I look or maybe how I can adjust my window 
definition? Are you familiar with any change that might be the cause 
for my issue?

Thanks


Re: Fails to deploy a python pipeline to a flink cluster

2024-05-11 Thread Jaehyeon Kim
Hi XQ

I haven't changed anything and the issue would persist on my end. The print
stuff is called only when self.verbose is True and, by default, it is False.

BTW Do you have any idea about the error message? I haven't seen such error.

Cheers,
Jaehyeon

On Sun, 12 May 2024, 12:15 am XQ Hu via user,  wrote:

> Do you still have the same issue? I tried to follow your setup.sh to
> reproduce this but somehow I am stuck at the word_len step. I saw you also
> tried to use `print(kafka_kv)` to debug it. I am not sure about your
> current status.
>
> On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim  wrote:
>
>> Hello,
>>
>> I'm playing with deploying a python pipeline to a flink cluster on
>> kubernetes via flink kubernetes operator. The pipeline simply calculates
>> average word lengths in a fixed time window of 5 seconds and it works with
>> the embedded flink cluster.
>>
>> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
>> named beam-python-example:1.17 created using the following docker file -
>> the full details can be checked in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>>
>> The java sdk is used for the sdk harness of the kafka io's expansion
>> service while the job server is used to execute the python pipeline in the
>> flink operator.
>>
>> FROM flink:1.17
>> ...
>> ## add java SDK and job server
>> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
>> /opt/apache/beam/
>>
>> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>>
>> RUN chown -R flink:flink /opt/apache/beam
>>
>> ## install python 3.10.13
>> RUN apt-get update -y && \
>>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>> libffi-dev liblzma-dev && \
>>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
>> ${PYTHON_VERSION}.tgz && \
>> ...
>> ## install apache beam 2.56.0
>> RUN pip3 install apache-beam==${BEAM_VERSION}
>>
>> ## copy pipeline source
>> RUN mkdir /opt/flink/app
>> COPY word_len.py /opt/flink/app/
>>
>> Then the pipeline is deployed using the following manifest - the full
>> details can be found in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>>
>> apiVersion: flink.apache.org/v1beta1
>> kind: FlinkDeployment
>> metadata:
>>   name: beam-word-len
>> spec:
>>   image: beam-python-example:1.17
>>   imagePullPolicy: Never
>>   flinkVersion: v1_17
>>   flinkConfiguration:
>> taskmanager.numberOfTaskSlots: "5"
>>   serviceAccount: flink
>>   podTemplate:
>> spec:
>>   containers:
>> - name: flink-main-container
>>   env:
>> - name: BOOTSTRAP_SERVERS
>>   value: demo-cluster-kafka-bootstrap:9092
>> ...
>>   jobManager:
>> resource:
>>   memory: "2048m"
>>   cpu: 1
>>   taskManager:
>> replicas: 2
>> resource:
>>   memory: "2048m"
>>   cpu: 1
>> podTemplate:
>>   spec:
>> containers:
>>   - name: python-worker-harness
>> image: apache/beam_python3.10_sdk:2.56.0
>> imagePullPolicy: Never
>> args: ["--worker_pool"]
>> ports:
>>   - containerPort: 5
>>
>>   job:
>> jarURI:
>> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
>> entryClass:
>> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
>> args:
>>   - "--driver-cmd"
>>   - "python /opt/flink/app/word_len.py --deploy"
>> parallelism: 3
>> upgradeMode: stateless
>>
>> Here is the pipeline source - the full details can be found in
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>>
>> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
>> and its config is set to localhost:5 - I believe it'll point to the
>> side car container of the task manager. For the kafka io, the expansion
>> service's sdk harness is configured as PROCESS and the command points to
>> the java sdk that is added in the beam-python-example:1.17 image.
>>
>> ...
>> def run(args=None):
>> parser = argparse.ArgumentParser(description="Beam pipeline
>> arguments")
>> parser.add_argument("--runner", default="FlinkRunner", help="Apache
>> Beam runner")
>> parser.add_argument(
>> "--deploy",
>> action="store_true",
>> default="Flag to indicate whether to use an own local cluster",
>> )
>> opts, _ = parser.parse_known_args(args)
>>
>> pipeline_opts = {
>> "runner": opts.runner,
>> "job_name": "avg-word-length-beam",
>> "streaming": True,
>> "environment_type": "EXTERNAL" if opts.deploy is True else
>> "LOOPBACK",
>> "checkpointing_interval": "6",
>> }
>>
>> expansion_service = None
>> if pipeline_opts["environment_type"] == 

Re: Fails to deploy a python pipeline to a flink cluster

2024-05-11 Thread XQ Hu via user
Do you still have the same issue? I tried to follow your setup.sh to
reproduce this but somehow I am stuck at the word_len step. I saw you also
tried to use `print(kafka_kv)` to debug it. I am not sure about your
current status.

On Fri, May 10, 2024 at 9:18 AM Jaehyeon Kim  wrote:

> Hello,
>
> I'm playing with deploying a python pipeline to a flink cluster on
> kubernetes via flink kubernetes operator. The pipeline simply calculates
> average word lengths in a fixed time window of 5 seconds and it works with
> the embedded flink cluster.
>
> First, I created a k8s cluster (v1.25.3) on minikube and a docker image
> named beam-python-example:1.17 created using the following docker file -
> the full details can be checked in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/Dockerfile
>
> The java sdk is used for the sdk harness of the kafka io's expansion
> service while the job server is used to execute the python pipeline in the
> flink operator.
>
> FROM flink:1.17
> ...
> ## add java SDK and job server
> COPY --from=apache/beam_java8_sdk:2.56.0 /opt/apache/beam/
> /opt/apache/beam/
>
> COPY --from=apache/beam_flink1.17_job_server:2.56.0  \
>   /opt/apache/beam/jars/beam-runners-flink-job-server.jar
> /opt/apache/beam/jars/beam-runners-flink-job-server.jar
>
> RUN chown -R flink:flink /opt/apache/beam
>
> ## install python 3.10.13
> RUN apt-get update -y && \
>   apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
> libffi-dev liblzma-dev && \
>   wget https://www.python.org/ftp/python/${PYTHON_VERSION}/Python-
> ${PYTHON_VERSION}.tgz && \
> ...
> ## install apache beam 2.56.0
> RUN pip3 install apache-beam==${BEAM_VERSION}
>
> ## copy pipeline source
> RUN mkdir /opt/flink/app
> COPY word_len.py /opt/flink/app/
>
> Then the pipeline is deployed using the following manifest - the full
> details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.yml
>
> apiVersion: flink.apache.org/v1beta1
> kind: FlinkDeployment
> metadata:
>   name: beam-word-len
> spec:
>   image: beam-python-example:1.17
>   imagePullPolicy: Never
>   flinkVersion: v1_17
>   flinkConfiguration:
> taskmanager.numberOfTaskSlots: "5"
>   serviceAccount: flink
>   podTemplate:
> spec:
>   containers:
> - name: flink-main-container
>   env:
> - name: BOOTSTRAP_SERVERS
>   value: demo-cluster-kafka-bootstrap:9092
> ...
>   jobManager:
> resource:
>   memory: "2048m"
>   cpu: 1
>   taskManager:
> replicas: 2
> resource:
>   memory: "2048m"
>   cpu: 1
> podTemplate:
>   spec:
> containers:
>   - name: python-worker-harness
> image: apache/beam_python3.10_sdk:2.56.0
> imagePullPolicy: Never
> args: ["--worker_pool"]
> ports:
>   - containerPort: 5
>
>   job:
> jarURI:
> local:///opt/apache/beam/jars/beam-runners-flink-job-server.jar
> entryClass:
> org.apache.beam.runners.flink.FlinkPortableClientEntryPoint
> args:
>   - "--driver-cmd"
>   - "python /opt/flink/app/word_len.py --deploy"
> parallelism: 3
> upgradeMode: stateless
>
> Here is the pipeline source - the full details can be found in
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-deploy/beam-deploy/beam/word_len.py
>
> When I add the --deploy flag, the python sdk harness is set to EXTERNAL
> and its config is set to localhost:5 - I believe it'll point to the
> side car container of the task manager. For the kafka io, the expansion
> service's sdk harness is configured as PROCESS and the command points to
> the java sdk that is added in the beam-python-example:1.17 image.
>
> ...
> def run(args=None):
> parser = argparse.ArgumentParser(description="Beam pipeline arguments"
> )
> parser.add_argument("--runner", default="FlinkRunner", help="Apache
> Beam runner")
> parser.add_argument(
> "--deploy",
> action="store_true",
> default="Flag to indicate whether to use an own local cluster",
> )
> opts, _ = parser.parse_known_args(args)
>
> pipeline_opts = {
> "runner": opts.runner,
> "job_name": "avg-word-length-beam",
> "streaming": True,
> "environment_type": "EXTERNAL" if opts.deploy is True else
> "LOOPBACK",
> "checkpointing_interval": "6",
> }
>
> expansion_service = None
> if pipeline_opts["environment_type"] == "EXTERNAL":
> pipeline_opts = {
> **pipeline_opts,
> **{
> "environment_config": "localhost:5",
> "flink_submit_uber_jar": True,
> },
> }
> expansion_service = kafka.default_io_expansion_service(
> append_args=[
> "--defaultEnvironmentType=PROCESS",
>
> '--defaultEnvironmentConfig={"command":"/opt/apache/beam/boot"}',
> 

Re: Query about `JdbcIO.PoolableDataSourceProvider`

2024-05-08 Thread Yi Hu
Hi Vardhan,

I checked the source code and history of PoolableDataSourceProvider, here is my 
finding

- PoolableDataSourceProvider is already a static singleton [1], which means it 
is one connection for each DataSourceConfiguration, per worker. More 
specifically, multiple threads within a worker should share a connection, if 
connect to the same database. PoolableDataSourceProvider should also support 
connecting to different databases, because the underlying singleton Map is 
keyed by DataSourceConfiguration.

- However, I notice there is another open issue [2] claiming "the current 
implementation default parameters cannot cover all cases". I am wondering if 
this is the case and leads to "overwhelm the source db" you observe ?

[1] https://github.com/apache/beam/pull/8635

[2] https://github.com/apache/beam/issues/19393

In any case, one can define their own withDataSourceProviderFn (as mentioned by 
[2]) that implements a custom connection pool.

Best,
Yi

On 2024/05/04 12:18:47 Vardhan Thigle via user wrote:
> Hi Beam Experts,
> 
> I had a small query about `JdbcIO.PoolableDataSourceProvider`
> 
> As per main the documentation of JdbcIO
> ,
> (IIUC) `JdbcIO.PoolableDataSourceProvider` creates one DataSource per
> execution thread by default which can overwhelm the source db.
> 
> Where As
> 
> As per the Java doc of
> 
> JdbcIO.PoolableDataSourceProvider,
> 
> 
> 
> At most a single DataSource instance will be constructed during pipeline
> execution for each unique JdbcIO.DataSourceConfiguration
> 
> within
> the pipeline.
> 
> If I want a singleton poolable connection for a given source database and
> my pipeline is dealing with multiple source databases, do I need to wrap
> the `JdbcIO.PoolableDataSourceProvider` in another concurrent hash map
> (from the implementation it looks lit that's what it does already and it's
> not needed)?I am a bit confused due to the variation in the 2 docs above
> (it's quite possible that I am interpreting them wrong)
> Would it be more recommended to rollout a custom class as suggested in the
> main documentation of JdbcIO
> ,
> in cases like:1. configure the poolconfig 2. Use an alternative source like
> say Hikari which If I understand correctly is not possible with
> JdbcIO.PoolableDataSourceProvider
> 
> .
> 
> 
> 
> 
> Regards and Thanks,
> Vardhan Thigle,
> +919535346204
> 


Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-05 Thread XQ Hu via user
I added this issue here
https://github.com/apache/beam/issues/24528#issuecomment-2095026324
But we do not plan to fix this for Python DirectRunner since Prism will
become the default local runner when it is ready.

On Sun, May 5, 2024 at 2:41 PM Jaehyeon Kim  wrote:

> Hi XQ
>
> Yes, it works with the FlinkRunner. Thank you so much!
>
> Cheers,
> Jaehyeon
>
> [image: image.png]
>
> On Mon, 6 May 2024 at 02:49, XQ Hu via user  wrote:
>
>> Have you tried to use other runners? I think this might be caused by some
>> gaps in Python DirectRunner to support the streaming cases or SDFs,
>>
>> On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim  wrote:
>>
>>> Hi XQ
>>>
>>> Thanks for checking it out. SDFs chaining seems to work as I created my
>>> pipeline while converting a pipeline that is built in the Java SDK. The
>>> source of the Java pipeline can be found in
>>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>>>
>>> So far, when I yield outputs, the second SDF gets stuck while it gets
>>> executed if I return them (but the first SDF completes). If I change the
>>> second SDF into a do function without adding the tracker, it is executed
>>> fine. Not sure what happens in the first scenario.
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>> On Sun, 5 May 2024 at 09:21, XQ Hu via user 
>>> wrote:
>>>
 I played with your example. Indeed, create_tracker in
 your ProcessFilesFn is never called, which is quite strange.
 I could not find any example that shows the chained SDFs, which makes
 me wonder whether the chained SDFs work.

 @Chamikara Jayalath  Any thoughts?

 On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:

> Hello,
>
> I am building a pipeline using two SDFs that are chained. The first
> function (DirectoryWatchFn) checks a folder continuously and grabs if a 
> new
> file is added. The second one (ProcessFilesFn) processes a file
> while splitting each line - the processing simply prints the file name and
> line number.
>
> The process function of the first SDF gets stuck if I yield a new file
> object. Specifically, although the second SDF is called as I can check the
> initial restriction is created, the tracker is not created at all!
>
> On the other hand, if I return the file object list, the second SDF
> works fine but the issue is the first SDF stops as soon as it returns the
> first list of files.
>
> The source of the pipeline can be found in
> - First SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
> - Second SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
> - Pipeline:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>
> Can you please inform me how to handle this issue?
>
> Cheers,
> Jaehyeon
>
> class DirectoryWatchFn(beam.DoFn):
> POLL_TIMEOUT = 10
>
> @beam.DoFn.unbounded_per_element()
> def process(
> self,
> element: str,
> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
> DirectoryWatchRestrictionProvider()
> ),
> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
> WatermarkEstimatorParam(
> DirectoryWatchWatermarkEstimatorProvider()
> ),
> ) -> typing.Iterable[MyFile]:
> new_files = self._get_new_files_if_any(element, tracker)
> if self._process_new_files(tracker, watermark_estimater,
> new_files):
> # return [new_file[0] for new_file in new_files] #<-- it
> doesn't get stuck but the SDF finishes
> for new_file in new_files: #<--- it gets stuck if
> yielding file objects
> yield new_file[0]
> else:
> return
> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>
> def _get_new_files_if_any(
> self, element: str, tracker: DirectoryWatchRestrictionTracker
> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
> new_files = []
> for file in os.listdir(element):
> if (
> os.path.isfile(os.path.join(element, file))
> and file not in tracker.current_restriction().
> already_processed
> ):
> num_lines = sum(1 for _ in open(os.path.join(element,
> file)))
> new_file = MyFile(file, 0, num_lines)
> print(new_file)
> new_files.append(
> (
> new_file,
> 

Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-05 Thread Jaehyeon Kim
Hi XQ

Yes, it works with the FlinkRunner. Thank you so much!

Cheers,
Jaehyeon

[image: image.png]

On Mon, 6 May 2024 at 02:49, XQ Hu via user  wrote:

> Have you tried to use other runners? I think this might be caused by some
> gaps in Python DirectRunner to support the streaming cases or SDFs,
>
> On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim  wrote:
>
>> Hi XQ
>>
>> Thanks for checking it out. SDFs chaining seems to work as I created my
>> pipeline while converting a pipeline that is built in the Java SDK. The
>> source of the Java pipeline can be found in
>> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>>
>> So far, when I yield outputs, the second SDF gets stuck while it gets
>> executed if I return them (but the first SDF completes). If I change the
>> second SDF into a do function without adding the tracker, it is executed
>> fine. Not sure what happens in the first scenario.
>>
>> Cheers,
>> Jaehyeon
>>
>> On Sun, 5 May 2024 at 09:21, XQ Hu via user  wrote:
>>
>>> I played with your example. Indeed, create_tracker in
>>> your ProcessFilesFn is never called, which is quite strange.
>>> I could not find any example that shows the chained SDFs, which makes me
>>> wonder whether the chained SDFs work.
>>>
>>> @Chamikara Jayalath  Any thoughts?
>>>
>>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:
>>>
 Hello,

 I am building a pipeline using two SDFs that are chained. The first
 function (DirectoryWatchFn) checks a folder continuously and grabs if a new
 file is added. The second one (ProcessFilesFn) processes a file
 while splitting each line - the processing simply prints the file name and
 line number.

 The process function of the first SDF gets stuck if I yield a new file
 object. Specifically, although the second SDF is called as I can check the
 initial restriction is created, the tracker is not created at all!

 On the other hand, if I return the file object list, the second SDF
 works fine but the issue is the first SDF stops as soon as it returns the
 first list of files.

 The source of the pipeline can be found in
 - First SDF:
 https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
 - Second SDF:
 https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
 - Pipeline:
 https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py

 Can you please inform me how to handle this issue?

 Cheers,
 Jaehyeon

 class DirectoryWatchFn(beam.DoFn):
 POLL_TIMEOUT = 10

 @beam.DoFn.unbounded_per_element()
 def process(
 self,
 element: str,
 tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
 DirectoryWatchRestrictionProvider()
 ),
 watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
 WatermarkEstimatorParam(
 DirectoryWatchWatermarkEstimatorProvider()
 ),
 ) -> typing.Iterable[MyFile]:
 new_files = self._get_new_files_if_any(element, tracker)
 if self._process_new_files(tracker, watermark_estimater,
 new_files):
 # return [new_file[0] for new_file in new_files] #<-- it
 doesn't get stuck but the SDF finishes
 for new_file in new_files: #<--- it gets stuck if yielding
 file objects
 yield new_file[0]
 else:
 return
 tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))

 def _get_new_files_if_any(
 self, element: str, tracker: DirectoryWatchRestrictionTracker
 ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
 new_files = []
 for file in os.listdir(element):
 if (
 os.path.isfile(os.path.join(element, file))
 and file not in tracker.current_restriction().
 already_processed
 ):
 num_lines = sum(1 for _ in open(os.path.join(element,
 file)))
 new_file = MyFile(file, 0, num_lines)
 print(new_file)
 new_files.append(
 (
 new_file,
 Timestamp.of(os.path.getmtime(os.path.join(
 element, file))),
 )
 )
 return new_files

 def _process_new_files(
 self,
 tracker: DirectoryWatchRestrictionTracker,
 watermark_estimater: ManualWatermarkEstimator,
 new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
 ):

Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-05 Thread XQ Hu via user
Have you tried to use other runners? I think this might be caused by some
gaps in Python DirectRunner to support the streaming cases or SDFs,

On Sun, May 5, 2024 at 5:10 AM Jaehyeon Kim  wrote:

> Hi XQ
>
> Thanks for checking it out. SDFs chaining seems to work as I created my
> pipeline while converting a pipeline that is built in the Java SDK. The
> source of the Java pipeline can be found in
> https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java
>
> So far, when I yield outputs, the second SDF gets stuck while it gets
> executed if I return them (but the first SDF completes). If I change the
> second SDF into a do function without adding the tracker, it is executed
> fine. Not sure what happens in the first scenario.
>
> Cheers,
> Jaehyeon
>
> On Sun, 5 May 2024 at 09:21, XQ Hu via user  wrote:
>
>> I played with your example. Indeed, create_tracker in your ProcessFilesFn
>> is never called, which is quite strange.
>> I could not find any example that shows the chained SDFs, which makes me
>> wonder whether the chained SDFs work.
>>
>> @Chamikara Jayalath  Any thoughts?
>>
>> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:
>>
>>> Hello,
>>>
>>> I am building a pipeline using two SDFs that are chained. The first
>>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
>>> file is added. The second one (ProcessFilesFn) processes a file
>>> while splitting each line - the processing simply prints the file name and
>>> line number.
>>>
>>> The process function of the first SDF gets stuck if I yield a new file
>>> object. Specifically, although the second SDF is called as I can check the
>>> initial restriction is created, the tracker is not created at all!
>>>
>>> On the other hand, if I return the file object list, the second SDF
>>> works fine but the issue is the first SDF stops as soon as it returns the
>>> first list of files.
>>>
>>> The source of the pipeline can be found in
>>> - First SDF:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>>> - Second SDF:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>>> - Pipeline:
>>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>>
>>> Can you please inform me how to handle this issue?
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>> class DirectoryWatchFn(beam.DoFn):
>>> POLL_TIMEOUT = 10
>>>
>>> @beam.DoFn.unbounded_per_element()
>>> def process(
>>> self,
>>> element: str,
>>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>>> DirectoryWatchRestrictionProvider()
>>> ),
>>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>>> WatermarkEstimatorParam(
>>> DirectoryWatchWatermarkEstimatorProvider()
>>> ),
>>> ) -> typing.Iterable[MyFile]:
>>> new_files = self._get_new_files_if_any(element, tracker)
>>> if self._process_new_files(tracker, watermark_estimater,
>>> new_files):
>>> # return [new_file[0] for new_file in new_files] #<-- it
>>> doesn't get stuck but the SDF finishes
>>> for new_file in new_files: #<--- it gets stuck if yielding
>>> file objects
>>> yield new_file[0]
>>> else:
>>> return
>>> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>>
>>> def _get_new_files_if_any(
>>> self, element: str, tracker: DirectoryWatchRestrictionTracker
>>> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>>> new_files = []
>>> for file in os.listdir(element):
>>> if (
>>> os.path.isfile(os.path.join(element, file))
>>> and file not in tracker.current_restriction().
>>> already_processed
>>> ):
>>> num_lines = sum(1 for _ in open(os.path.join(element,
>>> file)))
>>> new_file = MyFile(file, 0, num_lines)
>>> print(new_file)
>>> new_files.append(
>>> (
>>> new_file,
>>> Timestamp.of(os.path.getmtime(os.path.join(
>>> element, file))),
>>> )
>>> )
>>> return new_files
>>>
>>> def _process_new_files(
>>> self,
>>> tracker: DirectoryWatchRestrictionTracker,
>>> watermark_estimater: ManualWatermarkEstimator,
>>> new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>>> ):
>>> max_instance = watermark_estimater.current_watermark()
>>> for new_file in new_files:
>>> if tracker.try_claim(new_file[0].name) is False:
>>> watermark_estimater.set_watermark(max_instance)
>>> return False

Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-05 Thread Jaehyeon Kim
Hi XQ

Thanks for checking it out. SDFs chaining seems to work as I created my
pipeline while converting a pipeline that is built in the Java SDK. The
source of the Java pipeline can be found in
https://github.com/PacktPublishing/Building-Big-Data-Pipelines-with-Apache-Beam/blob/main/chapter7/src/main/java/com/packtpub/beam/chapter7/StreamingFileRead.java

So far, when I yield outputs, the second SDF gets stuck while it gets
executed if I return them (but the first SDF completes). If I change the
second SDF into a do function without adding the tracker, it is executed
fine. Not sure what happens in the first scenario.

Cheers,
Jaehyeon

On Sun, 5 May 2024 at 09:21, XQ Hu via user  wrote:

> I played with your example. Indeed, create_tracker in your ProcessFilesFn
> is never called, which is quite strange.
> I could not find any example that shows the chained SDFs, which makes me
> wonder whether the chained SDFs work.
>
> @Chamikara Jayalath  Any thoughts?
>
> On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:
>
>> Hello,
>>
>> I am building a pipeline using two SDFs that are chained. The first
>> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
>> file is added. The second one (ProcessFilesFn) processes a file
>> while splitting each line - the processing simply prints the file name and
>> line number.
>>
>> The process function of the first SDF gets stuck if I yield a new file
>> object. Specifically, although the second SDF is called as I can check the
>> initial restriction is created, the tracker is not created at all!
>>
>> On the other hand, if I return the file object list, the second SDF works
>> fine but the issue is the first SDF stops as soon as it returns the first
>> list of files.
>>
>> The source of the pipeline can be found in
>> - First SDF:
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
>> - Second SDF:
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
>> - Pipeline:
>> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>>
>> Can you please inform me how to handle this issue?
>>
>> Cheers,
>> Jaehyeon
>>
>> class DirectoryWatchFn(beam.DoFn):
>> POLL_TIMEOUT = 10
>>
>> @beam.DoFn.unbounded_per_element()
>> def process(
>> self,
>> element: str,
>> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
>> DirectoryWatchRestrictionProvider()
>> ),
>> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
>> WatermarkEstimatorParam(
>> DirectoryWatchWatermarkEstimatorProvider()
>> ),
>> ) -> typing.Iterable[MyFile]:
>> new_files = self._get_new_files_if_any(element, tracker)
>> if self._process_new_files(tracker, watermark_estimater,
>> new_files):
>> # return [new_file[0] for new_file in new_files] #<-- it
>> doesn't get stuck but the SDF finishes
>> for new_file in new_files: #<--- it gets stuck if yielding
>> file objects
>> yield new_file[0]
>> else:
>> return
>> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>>
>> def _get_new_files_if_any(
>> self, element: str, tracker: DirectoryWatchRestrictionTracker
>> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
>> new_files = []
>> for file in os.listdir(element):
>> if (
>> os.path.isfile(os.path.join(element, file))
>> and file not in tracker.current_restriction().
>> already_processed
>> ):
>> num_lines = sum(1 for _ in open(os.path.join(element,
>> file)))
>> new_file = MyFile(file, 0, num_lines)
>> print(new_file)
>> new_files.append(
>> (
>> new_file,
>> Timestamp.of(os.path.getmtime(os.path.join(
>> element, file))),
>> )
>> )
>> return new_files
>>
>> def _process_new_files(
>> self,
>> tracker: DirectoryWatchRestrictionTracker,
>> watermark_estimater: ManualWatermarkEstimator,
>> new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
>> ):
>> max_instance = watermark_estimater.current_watermark()
>> for new_file in new_files:
>> if tracker.try_claim(new_file[0].name) is False:
>> watermark_estimater.set_watermark(max_instance)
>> return False
>> if max_instance < new_file[1]:
>> max_instance = new_file[1]
>> watermark_estimater.set_watermark(max_instance)
>> return max_instance < MAX_TIMESTAMP
>>
>


Re: Pipeline gets stuck when chaining two SDFs (Python SDK)

2024-05-04 Thread XQ Hu via user
I played with your example. Indeed, create_tracker in your ProcessFilesFn
is never called, which is quite strange.
I could not find any example that shows the chained SDFs, which makes me
wonder whether the chained SDFs work.

@Chamikara Jayalath  Any thoughts?

On Fri, May 3, 2024 at 2:45 AM Jaehyeon Kim  wrote:

> Hello,
>
> I am building a pipeline using two SDFs that are chained. The first
> function (DirectoryWatchFn) checks a folder continuously and grabs if a new
> file is added. The second one (ProcessFilesFn) processes a file
> while splitting each line - the processing simply prints the file name and
> line number.
>
> The process function of the first SDF gets stuck if I yield a new file
> object. Specifically, although the second SDF is called as I can check the
> initial restriction is created, the tracker is not created at all!
>
> On the other hand, if I return the file object list, the second SDF works
> fine but the issue is the first SDF stops as soon as it returns the first
> list of files.
>
> The source of the pipeline can be found in
> - First SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/directory_watch.py
> - Second SDF:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/file_read.py
> - Pipeline:
> https://github.com/jaehyeon-kim/beam-demos/blob/feature/beam-pipeline/beam-pipelines/chapter7/streaming_file_read.py
>
> Can you please inform me how to handle this issue?
>
> Cheers,
> Jaehyeon
>
> class DirectoryWatchFn(beam.DoFn):
> POLL_TIMEOUT = 10
>
> @beam.DoFn.unbounded_per_element()
> def process(
> self,
> element: str,
> tracker: RestrictionTrackerView = beam.DoFn.RestrictionParam(
> DirectoryWatchRestrictionProvider()
> ),
> watermark_estimater: WatermarkEstimatorProvider = beam.DoFn.
> WatermarkEstimatorParam(
> DirectoryWatchWatermarkEstimatorProvider()
> ),
> ) -> typing.Iterable[MyFile]:
> new_files = self._get_new_files_if_any(element, tracker)
> if self._process_new_files(tracker, watermark_estimater, new_files
> ):
> # return [new_file[0] for new_file in new_files] #<-- it
> doesn't get stuck but the SDF finishes
> for new_file in new_files: #<--- it gets stuck if yielding
> file objects
> yield new_file[0]
> else:
> return
> tracker.defer_remainder(Duration.of(self.POLL_TIMEOUT))
>
> def _get_new_files_if_any(
> self, element: str, tracker: DirectoryWatchRestrictionTracker
> ) -> typing.List[typing.Tuple[MyFile, Timestamp]]:
> new_files = []
> for file in os.listdir(element):
> if (
> os.path.isfile(os.path.join(element, file))
> and file not in tracker.current_restriction().
> already_processed
> ):
> num_lines = sum(1 for _ in open(os.path.join(element, file
> )))
> new_file = MyFile(file, 0, num_lines)
> print(new_file)
> new_files.append(
> (
> new_file,
> Timestamp.of(os.path.getmtime(os.path.join(element,
> file))),
> )
> )
> return new_files
>
> def _process_new_files(
> self,
> tracker: DirectoryWatchRestrictionTracker,
> watermark_estimater: ManualWatermarkEstimator,
> new_files: typing.List[typing.Tuple[MyFile, Timestamp]],
> ):
> max_instance = watermark_estimater.current_watermark()
> for new_file in new_files:
> if tracker.try_claim(new_file[0].name) is False:
> watermark_estimater.set_watermark(max_instance)
> return False
> if max_instance < new_file[1]:
> max_instance = new_file[1]
> watermark_estimater.set_watermark(max_instance)
> return max_instance < MAX_TIMESTAMP
>


Re: Query about `JdbcIO.PoolableDataSourceProvider`

2024-05-04 Thread Vardhan Thigle via user
Regards and Thanks,
Vardhan Thigle,
+919535346204
A small correction, I intended to link to JdbcIO.html



On Sat, May 4, 2024 at 5:48 PM Vardhan Thigle 
wrote:

> Hi Beam Experts,
>
> I had a small query about `JdbcIO.PoolableDataSourceProvider`
>
> As per main the documentation of JdbcIO
> ,
> (IIUC) `JdbcIO.PoolableDataSourceProvider` creates one DataSource per
> execution thread by default which can overwhelm the source db.
>
> Where As
>
> As per the Java doc of
> 
> JdbcIO.PoolableDataSourceProvider,
> 
>
>
> At most a single DataSource instance will be constructed during pipeline
> execution for each unique JdbcIO.DataSourceConfiguration
> 
>  within
> the pipeline.
>
> If I want a singleton poolable connection for a given source database and
> my pipeline is dealing with multiple source databases, do I need to wrap
> the `JdbcIO.PoolableDataSourceProvider` in another concurrent hash map
> (from the implementation it looks lit that's what it does already and it's
> not needed)?I am a bit confused due to the variation in the 2 docs above
> (it's quite possible that I am interpreting them wrong)
> Would it be more recommended to rollout a custom class as suggested in the
> main documentation of JdbcIO
> ,
> in cases like:1. configure the poolconfig 2. Use an alternative source
> like say Hikari which If I understand correctly is not possible with
> JdbcIO.PoolableDataSourceProvider
> 
> .
>
>
>
>
> Regards and Thanks,
> Vardhan Thigle,
> +919535346204 <+91%2095353%2046204>
>


Re: Watermark progress halt in Python streaming pipelines

2024-04-30 Thread Wiśniowski Piotr

Hi,

Getting back with update. After updating `grpcio` the issues are gone. 
Thank you for the solution and investigation. Feels like I own you a 
beer :)


Best

Wiśniowski Piotr

On 24.04.2024 22:11, Valentyn Tymofieiev wrote:



On Wed, Apr 24, 2024 at 12:40 PM Wiśniowski Piotr 
 wrote:


Hi!

Thank you for the hint. We will try with the mitigation from the
issue. We did already tried everything from

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
, but lets hope upgrading the dependency will help. Will keep
reply to this thread once I get confirmation.

BTW great job on the investigation of bug that you mentioned.
Impressive. Seems like a nasty one.

Thanks. I was specifically recommending you check the recently added 
content under "It might be possible to retrieve stacktraces of a 
thread that is holding the GIL on a running Dataflow worker as 
follows:", as that should help find out what is causing stuckness in 
your case. But hopefully it won't be necessary after you adjust the 
grpcio version.


Best,

Wiśniowski Piotr

On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote:

You might be running into
https://github.com/apache/beam/issues/30867.

Among the error messages you mentioned, the  following is closer
to rootcause: ``Error message from worker: generic::internal:
Error encountered with the status channel: There are 10
consecutive failures obtaining SDK worker status info from
sdk-0-0. The last success response was received 3h20m2.648304212s
ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker appears to
be permanently unresponsive. Aborting the SDK. For more
information, see:

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```

<https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact>

If mitigations in https://github.com/apache/beam/issues/30867
don't resolve your issue, please see

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
for insturctions on how to find what causes the workers to be stuck.

Thanks!

On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr
 wrote:

Hi,
We are investigating an issue with our Python SDK streaming
pipelines, and have few questions, but first context.
Our stack:
- Python SDK 2.54.0 but we tried also 2.55.1
- DataFlow Streaming engine with sdk in container image (we
tried also Prime)
- Currently our pipelines do have low enough traffic, so that
single node handles it most of the time, but occasionally we
do scale up.
- Deployment by Terraform `google_dataflow_flex_template_job`
resource, which normally does job update when re-applying
Terraform.
- We do use a lot `ReadModifyWriteStateSpec`, other states
and watermark timers, but we do keep a the size of state
under control.
- We do use custom coders as Pydantic avro.
The issue:
- Occasionally watermark progression stops. The issue is not
deterministic, and happens like 1-2 per day for few pipelines.
- No user code errors reported- but we do get errors like this:
```INTERNAL: The work item requesting state read is no longer
valid on the backend. The work has already completed or will
be retried. This is expected during autoscaling events.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/windmill/client/streaming_rpc_client.cc" line: 767
} } }']```
```ABORTED: SDK harness sdk-0-0 disconnected. This usually
means that the process running the pipeline code has crashed.
Inspect the Worker Logs and the Diagnostics tab to determine
the cause of the crash.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/dax/workflow/worker/fnapi_control_service.cc"
line: 217 } } } [dist_proc.dax.MessageCode] { origin_id:
5391582787251181999
[dist_proc.dax.workflow.workflow_io_message_ext]:
SDK_DISCONNECT }']```
```Work item for sharding key 8dd4578b4f280f5d tokens
(1316764909133315359, 17766288489530478880) encountered error
during processing, will be retried (possibly on another
worker): generic::internal: Error encountered with the status
channel: SDK harness sdk-0-0 disconnected. with MessageCode:
(93f1db2f7a4a325

Re: Hot update in dataflow without lossing messages

2024-04-28 Thread Wiśniowski Piotr

HI,

I have pretty same setup. Regarding Terraform and DataFlow on GCP:

- Terraform apply does check if there is a DataFlow job running with 
same `job_name`


- if there is not - it does create a new one and waits till its in 
"running" state


- if there is one already - it does try to update the job, which means 
create a new job with same "job_name" which will be running the new 
version of the code, and send "update" signal to the old one. After 
that, old job halts and waits for the new one to fully start and 
transmit the state of the old job. Once that's done the old job goes 
into "updated" state, and new one does process messages. If the new one 
fails the old one resumes processing.


- Note for this to work the new code requires to be compatible with the 
old one. If its not, the new job will fail, and old job will get 
slightly behind as it needed to wait for the new job to fail.


- Note 2: there is a way to run verify compatibility so that the new job 
will not start, but there will be a check to make sure it is compatible 
with the new job, hence avoiding possible delays in the old job.


- Note 3: there is entirely separate job update type called "in-flight 
update" which does not effectively change the job, but allows to change 
autoscaller parameters (like max number of workers) without creating any 
delays in the pipeline.


Given above context, to fully diagnose your issue, more information is 
needed, but you might be hitting the issue mentioned by Robert:


- if you use a topic for PubSubIO, this will mean that each new job does 
create a new subscription on the topic on graph construction time. So 
this means if there are messages in the old subscription that were not 
yet processed (and acked) by the old pipeline, and the old pipeline gets 
"update" signal and halts, there might be some time duration when 
messages can be published to the old subscription and not published to 
the new one.


Workarounds:

- use subscription on PubSubIO or

- use random job names on TF and drain old pipelines.

Note all above is just hypothesis, but hopefully it might be helpful.

Best

Wiśniowski Piotr


On 16.04.2024 05:15, Juan Romero wrote:
The deployment in the job is made by terraform. I verified and seems 
that terraform do it incorrectly under the hood because it stop the 
current job and starts and new one. Thanks for the information !


On Mon, 15 Apr 2024 at 6:42 PM Robert Bradshaw via user 
 wrote:


Are you draining[1] your pipeline or simply canceling it and
starting a new one? Draining should close open windows and attempt
to flush all in-flight data before shutting down. For PubSub you
may also need to read from subscriptions rather than topics to
ensure messages are processed by either one or the other.

[1]
https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain

On Mon, Apr 15, 2024 at 9:33 AM Juan Romero  wrote:

Hi guys. Good morning.

I haven't done some test in apache beam over data flow in
order to see if i can do an hot update or hot swap meanwhile
the pipeline is processing a bunch of messages that fall in a
time window of 10 minutes. What I saw is that when I do a hot
update over the pipeline and currently there are some messages
in the time window (before sending them to the target), the
current job is shutdown and dataflow creates a new one. The
problem is that it seems that I am losing the messages that
were being processed in the old one and they are not taken by
the new one, which imply we are incurring in losing data .

Can you help me or recommend any strategy to me?

Thanks!!


Re: Any recomendation for key for GroupIntoBatches

2024-04-28 Thread Wiśniowski Piotr

Hi,

Might be late to the discussion, but providing another option (as I 
think it was not mentioned or I missed it). Take a look at 
[this](https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements) 
as I think this is precisely what you want to achieve.


Compared to other answers:

- this one is elastic, to fit any downstream use case

- no custom code - native Beam transform

- no shuffling of the data required as the data would be batched on the 
worker already having the data (but pay attention to the max msg size 
limit of your runner) - shuffling would be required when creating 
artificial random-looking keys.


Note that above is Python, but I do bet there is Java counterpart (or at 
least easy to implement).


Best

Wiśniowski Piotr


On 15.04.2024 19:14, Reuven Lax via user wrote:
There are various strategies. Here is an example of how Beam does it 
(taken from Reshuffle.viaRandomKey().withNumBuckets(N)


Note that this does some extra hashing to work around issues with the 
Spark runner. If you don't care about that, you could implement 
something simpler (e.g. initialize shard to a random number in 
StartBundle, and increment it mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
   private int shard;
   private @Nullable Integer numBuckets;

   public AssignShardFn(@Nullable Integer numBuckets) {
 this.numBuckets = numBuckets;
   }

   @Setup public void setup() {
 shard =ThreadLocalRandom.current().nextInt();
   }

   @ProcessElement public void processElement(@Element T 
element,OutputReceiver> r) {
 ++shard;
 // Smear the shard into something more random-looking, to avoid issues 
// with runners that don't properly hash the key being shuffled, but 
rely // on it being random-looking. E.g. Spark takes the Java 
hashCode() of keys, // which for Integer is a no-op and it is an 
issue: // 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in- 
// spark.html // This hashing strategy is copied from // 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear(). 
int hashOfShard =0x1b873593 *Integer.rotateLeft(shard *0xcc9e2d51,15);

 if (numBuckets !=null) {
   UnsignedInteger unsignedNumBuckets 
=UnsignedInteger.fromIntBits(numBuckets);
   hashOfShard 
=UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
 }
 r.output(KV.of(hashOfShard, element));
   }
}

On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
 wrote:


Good day, Ruben,

Would you be able to compute a shasum on the group of IDs to use
as the key?

Best,

Damon

On 2024/04/12 19:22:45 Ruben Vargas wrote:
> Hello guys
>
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
>
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
>
> This is my use case:
>
> I have an endpoint where I'm gonna send the message ID, this
endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I
want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
>
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same
ID).
> - Use a constant will be a problem for parallelism, is that correct?
>
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that
arrive
> at a certain second?
>
> Any suggestions would be appreciated
>
> Thanks.
>


Re: Watermark progress halt in Python streaming pipelines

2024-04-24 Thread Valentyn Tymofieiev via user
On Wed, Apr 24, 2024 at 12:40 PM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi!
>
> Thank you for the hint. We will try with the mitigation from the issue. We
> did already tried everything from
> https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
> , but lets hope upgrading the dependency will help. Will keep reply to this
> thread once I get confirmation.
>
> BTW great job on the investigation of bug that you mentioned. Impressive.
> Seems like a nasty one.
>
Thanks. I was specifically recommending you check the recently added
content under "It might be possible to retrieve stacktraces of a thread
that is holding the GIL on a running Dataflow worker as follows:", as that
should help find out what is causing stuckness in your case. But hopefully
it won't be necessary after you adjust the grpcio version.



> Best,
>
> Wiśniowski Piotr
> On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote:
>
> You might be running into https://github.com/apache/beam/issues/30867.
>
> Among the error messages you mentioned, the  following is closer to
> rootcause: ``Error message from worker: generic::internal: Error
> encountered with the status channel: There are 10 consecutive failures
> obtaining SDK worker status info from sdk-0-0. The last success response
> was received 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00.
> SDK worker appears to be permanently unresponsive. Aborting the SDK. For
> more information, see:
> https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```
> <https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact>
>
> If mitigations in https://github.com/apache/beam/issues/30867 don't
> resolve your issue, please see
> https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
> for insturctions on how to find what causes the workers to be stuck.
>
> Thanks!
>
> On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr <
> contact.wisniowskipi...@gmail.com> wrote:
>
>> Hi,
>> We are investigating an issue with our Python SDK streaming pipelines,
>> and have few questions, but first context.
>> Our stack:
>> - Python SDK 2.54.0 but we tried also 2.55.1
>> - DataFlow Streaming engine with sdk in container image (we tried also
>> Prime)
>> - Currently our pipelines do have low enough traffic, so that single
>> node handles it most of the time, but occasionally we do scale up.
>> - Deployment by Terraform `google_dataflow_flex_template_job` resource,
>> which normally does job update when re-applying Terraform.
>> - We do use a lot `ReadModifyWriteStateSpec`, other states and watermark
>> timers, but we do keep a the size of state under control.
>> - We do use custom coders as Pydantic avro.
>> The issue:
>> - Occasionally watermark progression stops. The issue is not
>> deterministic, and happens like 1-2 per day for few pipelines.
>> - No user code errors reported- but we do get errors like this:
>> ```INTERNAL: The work item requesting state read is no longer valid on
>> the backend. The work has already completed or will be retried. This is
>> expected during autoscaling events. [
>> type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]
>> <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
>> { trail_point { source_file_loc { filepath:
>> "dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } } }']```
>> ```ABORTED: SDK harness sdk-0-0 disconnected. This usually means that the
>> process running the pipeline code has crashed. Inspect the Worker Logs and
>> the Diagnostics tab to determine the cause of the crash. [
>> type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]
>> <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
>> { trail_point { source_file_loc { filepath:
>> "dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217 } } }
>> [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999
>> [dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT }']```
>> ```Work item for sharding key 8dd4578b4f280f5d tokens
>> (1316764909133315359, 17766288489530478880) encountered error during
>> processing, will be retried (possibly on another worker):
>> generic::internal: Error encountered with the status channel: SDK harness
>> sdk-0-0 disconnected. with MessageCode: (93f1db2f7a4a325c): SDK
>> disconnect.```
>> ```Python (worker sdk-0-0_sibling_1) exited 1 times: signal: segmentation
>> 

Re: Watermark progress halt in Python streaming pipelines

2024-04-24 Thread Wiśniowski Piotr

Hi!

Thank you for the hint. We will try with the mitigation from the issue. 
We did already tried everything from 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact 
, but lets hope upgrading the dependency will help. Will keep reply to 
this thread once I get confirmation.


BTW great job on the investigation of bug that you mentioned. 
Impressive. Seems like a nasty one.


Best,

Wiśniowski Piotr

On 24.04.2024 00:31, Valentyn Tymofieiev via user wrote:

You might be running into https://github.com/apache/beam/issues/30867.

Among the error messages you mentioned, the  following is closer to 
rootcause: ``Error message from worker: generic::internal: Error 
encountered with the status channel: There are 10 consecutive failures 
obtaining SDK worker status info from sdk-0-0. The last success 
response was received 3h20m2.648304212s ago at 
2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be 
permanently unresponsive. Aborting the SDK. For more information, see: 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact``` 
<https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```>


If mitigations in https://github.com/apache/beam/issues/30867 don't 
resolve your issue, please see 
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact 
for insturctions on how to find what causes the workers to be stuck.


Thanks!

On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr 
 wrote:


Hi,
We are investigating an issue with our Python SDK streaming
pipelines, and have few questions, but first context.
Our stack:
- Python SDK 2.54.0 but we tried also 2.55.1
- DataFlow Streaming engine with sdk in container image (we tried
also Prime)
- Currently our pipelines do have low enough traffic, so that
single node handles it most of the time, but occasionally we do
scale up.
- Deployment by Terraform `google_dataflow_flex_template_job`
resource, which normally does job update when re-applying Terraform.
- We do use a lot `ReadModifyWriteStateSpec`, other states and
watermark timers, but we do keep a the size of state under control.
- We do use custom coders as Pydantic avro.
The issue:
- Occasionally watermark progression stops. The issue is not
deterministic, and happens like 1-2 per day for few pipelines.
- No user code errors reported- but we do get errors like this:
```INTERNAL: The work item requesting state read is no longer
valid on the backend. The work has already completed or will be
retried. This is expected during autoscaling events.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } }
}']```
```ABORTED: SDK harness sdk-0-0 disconnected. This usually means
that the process running the pipeline code has crashed. Inspect
the Worker Logs and the Diagnostics tab to determine the cause of
the crash.

[type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]

<http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
{ trail_point { source_file_loc { filepath:
"dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217
} } } [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999
[dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT
}']```
```Work item for sharding key 8dd4578b4f280f5d tokens
(1316764909133315359, 17766288489530478880) encountered error
during processing, will be retried (possibly on another worker):
generic::internal: Error encountered with the status channel: SDK
harness sdk-0-0 disconnected. with MessageCode:
(93f1db2f7a4a325c): SDK disconnect.```
```Python (worker sdk-0-0_sibling_1) exited 1 times: signal:
segmentation fault (core dumped) restarting SDK process```
- We did manage to correlate this with either vertical autoscaling
event (when using Prime) or other worker replacements done by
Dataflow under the hood, but this is not deterministic.
- For few hours watermark progress does stop, but other workers do
process messages.
- and after few hours:
```Error message from worker: generic::internal: Error encountered
with the status channel: There are 10 consecutive failures
obtaining SDK worker status info from sdk-0-0. The last success
response was received 3h20m2.648304212s ago at
2024-04-23T11:48:35.493682768+00:00. SDK worker appears to be
permanently unresponsive. Aborting the SDK. For more information,
see:

https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```
   

Re: Watermark progress halt in Python streaming pipelines

2024-04-23 Thread Valentyn Tymofieiev via user
You might be running into https://github.com/apache/beam/issues/30867.

Among the error messages you mentioned, the  following is closer to
rootcause: ``Error message from worker: generic::internal: Error
encountered with the status channel: There are 10 consecutive failures
obtaining SDK worker status info from sdk-0-0. The last success response
was received 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00.
SDK worker appears to be permanently unresponsive. Aborting the SDK. For
more information, see:
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact```

If mitigations in https://github.com/apache/beam/issues/30867 don't resolve
your issue, please see
https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
for insturctions on how to find what causes the workers to be stuck.

Thanks!

On Tue, Apr 23, 2024 at 12:17 PM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi,
> We are investigating an issue with our Python SDK streaming pipelines, and
> have few questions, but first context.
> Our stack:
> - Python SDK 2.54.0 but we tried also 2.55.1
> - DataFlow Streaming engine with sdk in container image (we tried also
> Prime)
> - Currently our pipelines do have low enough traffic, so that single node
> handles it most of the time, but occasionally we do scale up.
> - Deployment by Terraform `google_dataflow_flex_template_job` resource,
> which normally does job update when re-applying Terraform.
> - We do use a lot `ReadModifyWriteStateSpec`, other states and watermark
> timers, but we do keep a the size of state under control.
> - We do use custom coders as Pydantic avro.
> The issue:
> - Occasionally watermark progression stops. The issue is not
> deterministic, and happens like 1-2 per day for few pipelines.
> - No user code errors reported- but we do get errors like this:
> ```INTERNAL: The work item requesting state read is no longer valid on the
> backend. The work has already completed or will be retried. This is
> expected during autoscaling events. [
> type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]
> <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
> { trail_point { source_file_loc { filepath:
> "dist_proc/windmill/client/streaming_rpc_client.cc" line: 767 } } }']```
> ```ABORTED: SDK harness sdk-0-0 disconnected. This usually means that the
> process running the pipeline code has crashed. Inspect the Worker Logs and
> the Diagnostics tab to determine the cause of the crash. [
> type.googleapis.com/util.MessageSetPayload='[dist_proc.dax.internal.TrailProto]
> <http://type.googleapis.com/util.MessageSetPayload='%5Bdist_proc.dax.internal.TrailProto%5D>
> { trail_point { source_file_loc { filepath:
> "dist_proc/dax/workflow/worker/fnapi_control_service.cc" line: 217 } } }
> [dist_proc.dax.MessageCode] { origin_id: 5391582787251181999
> [dist_proc.dax.workflow.workflow_io_message_ext]: SDK_DISCONNECT }']```
> ```Work item for sharding key 8dd4578b4f280f5d tokens
> (1316764909133315359, 17766288489530478880) encountered error during
> processing, will be retried (possibly on another worker):
> generic::internal: Error encountered with the status channel: SDK harness
> sdk-0-0 disconnected. with MessageCode: (93f1db2f7a4a325c): SDK
> disconnect.```
> ```Python (worker sdk-0-0_sibling_1) exited 1 times: signal: segmentation
> fault (core dumped) restarting SDK process```
> - We did manage to correlate this with either vertical autoscaling event
> (when using Prime) or other worker replacements done by Dataflow under the
> hood, but this is not deterministic.
> - For few hours watermark progress does stop, but other workers do
> process messages.
> - and after few hours:
> ```Error message from worker: generic::internal: Error encountered with
> the status channel: There are 10 consecutive failures obtaining SDK worker
> status info from sdk-0-0. The last success response was received
> 3h20m2.648304212s ago at 2024-04-23T11:48:35.493682768+00:00. SDK worker
> appears to be permanently unresponsive. Aborting the SDK. For more
> information, see:
> https://cloud.google.com/dataflow/docs/guides/common-errors#worker-lost-contact
> ```
> - And the pipeline starts to catch up and watermark progresses again.
> - Job update by Terraform apply also fixes the issue.
> - We do not see any extensive use of worker memory nor disk. CPU
> utilization is also most of the time close to idle. I do not think we do
> use C/C++ code with python. Nor use parallelism/threads outside beam
> parallelization.
> Questions:
> 1. What could be potential causes of such behavior? How to get more
> insights to this problem?
> 2. I have seen `In Pyth

Re: Hot update in dataflow without lossing messages

2024-04-15 Thread Juan Romero
The deployment in the job is made by terraform. I verified and seems that
terraform do it incorrectly under the hood because it stop the current job
and starts and new one. Thanks for the information !

On Mon, 15 Apr 2024 at 6:42 PM Robert Bradshaw via user <
user@beam.apache.org> wrote:

> Are you draining[1] your pipeline or simply canceling it and starting a
> new one? Draining should close open windows and attempt to flush all
> in-flight data before shutting down. For PubSub you may also need to read
> from subscriptions rather than topics to ensure messages are processed by
> either one or the other.
>
> [1]
> https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain
>
> On Mon, Apr 15, 2024 at 9:33 AM Juan Romero  wrote:
>
>> Hi guys. Good morning.
>>
>> I haven't done some test in apache beam over data flow in order to see if
>> i can do an hot update or hot swap meanwhile the pipeline is processing a
>> bunch of messages that fall in a time window of 10 minutes. What I saw is
>> that when I do a hot update over the pipeline and currently there are some
>> messages in the time window (before sending them to the target), the
>> current job is shutdown and dataflow creates a new one. The problem is that
>> it seems that I am losing the messages that were being processed in the old
>> one and they are not taken by the new one, which imply we are incurring in
>> losing data .
>>
>> Can you help me or recommend any strategy to me?
>>
>> Thanks!!
>>
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Robert Bradshaw via user
On Fri, Apr 12, 2024 at 1:39 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>

or abs(hash(id)) % 100, in case the first character of your id is not well
distributed.


> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>

Bundles are the unit of commitment in Beam [1], so finish_bundle won't drop
any data. A possible downside is that, especially in streaming, they may be
small which would cap the amount of batching you get.

https://beam.apache.org/documentation/runtime/model/#bundling-and-persistence


> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Hot update in dataflow without lossing messages

2024-04-15 Thread Robert Bradshaw via user
Are you draining[1] your pipeline or simply canceling it and starting a new
one? Draining should close open windows and attempt to flush all in-flight
data before shutting down. For PubSub you may also need to read from
subscriptions rather than topics to ensure messages are processed by either
one or the other.

[1] https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain

On Mon, Apr 15, 2024 at 9:33 AM Juan Romero  wrote:

> Hi guys. Good morning.
>
> I haven't done some test in apache beam over data flow in order to see if
> i can do an hot update or hot swap meanwhile the pipeline is processing a
> bunch of messages that fall in a time window of 10 minutes. What I saw is
> that when I do a hot update over the pipeline and currently there are some
> messages in the time window (before sending them to the target), the
> current job is shutdown and dataflow creates a new one. The problem is that
> it seems that I am losing the messages that were being processed in the old
> one and they are not taken by the new one, which imply we are incurring in
> losing data .
>
> Can you help me or recommend any strategy to me?
>
> Thanks!!
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Reuven Lax via user
There are various strategies. Here is an example of how Beam does it (taken
from Reshuffle.viaRandomKey().withNumBuckets(N)

Note that this does some extra hashing to work around issues with the Spark
runner. If you don't care about that, you could implement something simpler
(e.g. initialize shard to a random number in StartBundle, and increment it
mod numBuckets in each processelement call).

public static class AssignShardFn extends DoFn> {
  private int shard;
  private @Nullable Integer numBuckets;

  public AssignShardFn(@Nullable Integer numBuckets) {
this.numBuckets = numBuckets;
  }

  @Setup
  public void setup() {
shard = ThreadLocalRandom.current().nextInt();
  }

  @ProcessElement
  public void processElement(@Element T element,
OutputReceiver> r) {
++shard;
// Smear the shard into something more random-looking, to avoid issues
// with runners that don't properly hash the key being shuffled, but rely
// on it being random-looking. E.g. Spark takes the Java hashCode() of keys,
// which for Integer is a no-op and it is an issue:
// 
http://hydronitrogen.com/poor-hash-partitioning-of-timestamps-integers-and-longs-in-
// spark.html
// This hashing strategy is copied from
// 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Hashing.smear().
int hashOfShard = 0x1b873593 * Integer.rotateLeft(shard * 0xcc9e2d51, 15);
if (numBuckets != null) {
  UnsignedInteger unsignedNumBuckets =
UnsignedInteger.fromIntBits(numBuckets);
  hashOfShard =
UnsignedInteger.fromIntBits(hashOfShard).mod(unsignedNumBuckets).intValue();
}
r.output(KV.of(hashOfShard, element));
  }
}



On Mon, Apr 15, 2024 at 10:01 AM Damon Douglas 
wrote:

> Good day, Ruben,
>
> Would you be able to compute a shasum on the group of IDs to use as the
> key?
>
> Best,
>
> Damon
>
> On 2024/04/12 19:22:45 Ruben Vargas wrote:
> > Hello guys
> >
> > Maybe this question was already answered, but I cannot find it  and
> > want some more input on this topic.
> >
> > I have some messages that don't have any particular key candidate,
> > except the ID,  but I don't want to use it because the idea is to
> > group multiple IDs in the same batch.
> >
> > This is my use case:
> >
> > I have an endpoint where I'm gonna send the message ID, this endpoint
> > is gonna return me certain information which I will use to enrich my
> > message. In order to avoid fetching the endpoint per message I want to
> > batch it in 100 and send the 100 IDs in one request ( the endpoint
> > supports it) . I was thinking on using GroupIntoBatches.
> >
> > - If I choose the ID as the key, my understanding is that it won't
> > work in the way I want (because it will form batches of the same ID).
> > - Use a constant will be a problem for parallelism, is that correct?
> >
> > Then my question is, what should I use as a key? Maybe something
> > regarding the timestamp? so I can have groups of messages that arrive
> > at a certain second?
> >
> > Any suggestions would be appreciated
> >
> > Thanks.
> >
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Damon Douglas
Good day, Ruben,

Would you be able to compute a shasum on the group of IDs to use as the key?

Best,

Damon

On 2024/04/12 19:22:45 Ruben Vargas wrote:
> Hello guys
> 
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
> 
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
> 
> This is my use case:
> 
> I have an endpoint where I'm gonna send the message ID, this endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
> 
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same ID).
> - Use a constant will be a problem for parallelism, is that correct?
> 
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that arrive
> at a certain second?
> 
> Any suggestions would be appreciated
> 
> Thanks.
> 


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Yeah  unfortunately the data on the endpoint could change at any point
in time and I need to make sure to have the latest one :/

That limits my options here. But I also have other sources that can
benefit from this caching :)

Thank you very much!

On Mon, Apr 15, 2024 at 9:37 AM XQ Hu  wrote:
>
> I am not sure you still need to do batching since Web API can handle caching.
>
> If you really need it, I think GoupIntoBatches is a good way to go.
>
> On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas  wrote:
>>
>> Is there a way to do batching in that transformation? I'm assuming for
>> now no. or may be using in conjuntion with GoupIntoBatches
>>
>> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas  wrote:
>> >
>> > Interesting
>> >
>> > I think the cache feature could be interesting for some use cases I have.
>> >
>> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
>> > >
>> > > For the new web API IO, the page lists these features:
>> > >
>> > > developers provide minimal code that invokes Web API endpoint
>> > > delegate to the transform to handle request retries and exponential 
>> > > backoff
>> > > optional caching of request and response associations
>> > > optional metrics
>> > >
>> > >
>> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  
>> > > wrote:
>> > >>
>> > >> That one looks interesting
>> > >>
>> > >> What is not clear to me is what are the advantages of using it? Is
>> > >> only the error/retry handling? anything in terms of performance?
>> > >>
>> > >> My PCollection is unbounded but I was thinking of sending my messages
>> > >> in batches to the external API in order to gain some performance
>> > >> (don't expect to send 1 http request per message).
>> > >>
>> > >> Thank you very much for all your responses!
>> > >>
>> > >>
>> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  
>> > >> wrote:
>> > >> >
>> > >> > To enrich your data, have you checked 
>> > >> > https://cloud.google.com/dataflow/docs/guides/enrichment?
>> > >> >
>> > >> > This transform is built on top of 
>> > >> > https://beam.apache.org/documentation/io/built-in/webapis/
>> > >> >
>> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
>> > >> >  wrote:
>> > >> >>
>> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  
>> > >> >> wrote:
>> > >> >> >
>> > >> >> > Here is an example from a book that I'm reading now and it may be 
>> > >> >> > applicable.
>> > >> >> >
>> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> > >> >> > PYTHON - ord(id[0]) % 100
>> > >> >>
>> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>> > >> >>
>> > >> >> >
>> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
>> > >> >> >  wrote:
>> > >> >> >>
>> > >> >> >> How about just keeping track of a buffer and flush the buffer 
>> > >> >> >> after 100 messages and if there is a buffer on finish_bundle as 
>> > >> >> >> well?
>> > >> >> >>
>> > >> >> >>
>> > >> >>
>> > >> >> If this is in memory, It could lead to potential loss of data. That 
>> > >> >> is
>> > >> >> why the state is used or at least that is my understanding. but maybe
>> > >> >> there is a way to do this in the state?
>> > >> >>
>> > >> >>
>> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
>> > >> >> >>  wrote:
>> > >> >> >>>
>> > >> >> >>> Hello guys
>> > >> >> >>>
>> > >> >> >>> Maybe this question was already answered, but I cannot find it  
>> > >> >> >>> and
>> > >> >> >>> want some more input on this topic.
>> > >> >> >>>
>> > >> >> >>> I have some messages that don't have any particular key 
>> > >> >> >>> candidate,
>> > >> >> >>> except the ID,  but I don't want to use it because the idea is to
>> > >> >> >>> group multiple IDs in the same batch.
>> > >> >> >>>
>> > >> >> >>> This is my use case:
>> > >> >> >>>
>> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this 
>> > >> >> >>> endpoint
>> > >> >> >>> is gonna return me certain information which I will use to 
>> > >> >> >>> enrich my
>> > >> >> >>> message. In order to avoid fetching the endpoint per message I 
>> > >> >> >>> want to
>> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the 
>> > >> >> >>> endpoint
>> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
>> > >> >> >>>
>> > >> >> >>> - If I choose the ID as the key, my understanding is that it 
>> > >> >> >>> won't
>> > >> >> >>> work in the way I want (because it will form batches of the same 
>> > >> >> >>> ID).
>> > >> >> >>> - Use a constant will be a problem for parallelism, is that 
>> > >> >> >>> correct?
>> > >> >> >>>
>> > >> >> >>> Then my question is, what should I use as a key? Maybe something
>> > >> >> >>> regarding the timestamp? so I can have groups of messages that 
>> > >> >> >>> arrive
>> > >> >> >>> at a certain second?
>> > >> >> >>>
>> > >> >> >>> Any suggestions would be appreciated
>> > >> >> >>>
>> > >> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread XQ Hu via user
I am not sure you still need to do batching since Web API can handle
caching.

If you really need it, I think GoupIntoBatches is a good way to go.

On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas 
wrote:

> Is there a way to do batching in that transformation? I'm assuming for
> now no. or may be using in conjuntion with GoupIntoBatches
>
> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas 
> wrote:
> >
> > Interesting
> >
> > I think the cache feature could be interesting for some use cases I have.
> >
> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
> > >
> > > For the new web API IO, the page lists these features:
> > >
> > > developers provide minimal code that invokes Web API endpoint
> > > delegate to the transform to handle request retries and exponential
> backoff
> > > optional caching of request and response associations
> > > optional metrics
> > >
> > >
> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas 
> wrote:
> > >>
> > >> That one looks interesting
> > >>
> > >> What is not clear to me is what are the advantages of using it? Is
> > >> only the error/retry handling? anything in terms of performance?
> > >>
> > >> My PCollection is unbounded but I was thinking of sending my messages
> > >> in batches to the external API in order to gain some performance
> > >> (don't expect to send 1 http request per message).
> > >>
> > >> Thank you very much for all your responses!
> > >>
> > >>
> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user 
> wrote:
> > >> >
> > >> > To enrich your data, have you checked
> https://cloud.google.com/dataflow/docs/guides/enrichment?
> > >> >
> > >> > This transform is built on top of
> https://beam.apache.org/documentation/io/built-in/webapis/
> > >> >
> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas <
> ruben.var...@metova.com> wrote:
> > >> >>
> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim 
> wrote:
> > >> >> >
> > >> >> > Here is an example from a book that I'm reading now and it may
> be applicable.
> > >> >> >
> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > >> >> > PYTHON - ord(id[0]) % 100
> > >> >>
> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> > >> >>
> > >> >> >
> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian <
> ged1...@gmail.com> wrote:
> > >> >> >>
> > >> >> >> How about just keeping track of a buffer and flush the buffer
> after 100 messages and if there is a buffer on finish_bundle as well?
> > >> >> >>
> > >> >> >>
> > >> >>
> > >> >> If this is in memory, It could lead to potential loss of data.
> That is
> > >> >> why the state is used or at least that is my understanding. but
> maybe
> > >> >> there is a way to do this in the state?
> > >> >>
> > >> >>
> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas <
> ruben.var...@metova.com> wrote:
> > >> >> >>>
> > >> >> >>> Hello guys
> > >> >> >>>
> > >> >> >>> Maybe this question was already answered, but I cannot find
> it  and
> > >> >> >>> want some more input on this topic.
> > >> >> >>>
> > >> >> >>> I have some messages that don't have any particular key
> candidate,
> > >> >> >>> except the ID,  but I don't want to use it because the idea is
> to
> > >> >> >>> group multiple IDs in the same batch.
> > >> >> >>>
> > >> >> >>> This is my use case:
> > >> >> >>>
> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this
> endpoint
> > >> >> >>> is gonna return me certain information which I will use to
> enrich my
> > >> >> >>> message. In order to avoid fetching the endpoint per message I
> want to
> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the
> endpoint
> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> > >> >> >>>
> > >> >> >>> - If I choose the ID as the key, my understanding is that it
> won't
> > >> >> >>> work in the way I want (because it will form batches of the
> same ID).
> > >> >> >>> - Use a constant will be a problem for parallelism, is that
> correct?
> > >> >> >>>
> > >> >> >>> Then my question is, what should I use as a key? Maybe
> something
> > >> >> >>> regarding the timestamp? so I can have groups of messages that
> arrive
> > >> >> >>> at a certain second?
> > >> >> >>>
> > >> >> >>> Any suggestions would be appreciated
> > >> >> >>>
> > >> >> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Is there a way to do batching in that transformation? I'm assuming for
now no. or may be using in conjuntion with GoupIntoBatches

On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas  wrote:
>
> Interesting
>
> I think the cache feature could be interesting for some use cases I have.
>
> On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
> >
> > For the new web API IO, the page lists these features:
> >
> > developers provide minimal code that invokes Web API endpoint
> > delegate to the transform to handle request retries and exponential backoff
> > optional caching of request and response associations
> > optional metrics
> >
> >
> > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  
> > wrote:
> >>
> >> That one looks interesting
> >>
> >> What is not clear to me is what are the advantages of using it? Is
> >> only the error/retry handling? anything in terms of performance?
> >>
> >> My PCollection is unbounded but I was thinking of sending my messages
> >> in batches to the external API in order to gain some performance
> >> (don't expect to send 1 http request per message).
> >>
> >> Thank you very much for all your responses!
> >>
> >>
> >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  
> >> wrote:
> >> >
> >> > To enrich your data, have you checked 
> >> > https://cloud.google.com/dataflow/docs/guides/enrichment?
> >> >
> >> > This transform is built on top of 
> >> > https://beam.apache.org/documentation/io/built-in/webapis/
> >> >
> >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  
> >> > wrote:
> >> >>
> >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >> >> >
> >> >> > Here is an example from a book that I'm reading now and it may be 
> >> >> > applicable.
> >> >> >
> >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> >> >> > PYTHON - ord(id[0]) % 100
> >> >>
> >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> >> >>
> >> >> >
> >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
> >> >> > wrote:
> >> >> >>
> >> >> >> How about just keeping track of a buffer and flush the buffer after 
> >> >> >> 100 messages and if there is a buffer on finish_bundle as well?
> >> >> >>
> >> >> >>
> >> >>
> >> >> If this is in memory, It could lead to potential loss of data. That is
> >> >> why the state is used or at least that is my understanding. but maybe
> >> >> there is a way to do this in the state?
> >> >>
> >> >>
> >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> Hello guys
> >> >> >>>
> >> >> >>> Maybe this question was already answered, but I cannot find it  and
> >> >> >>> want some more input on this topic.
> >> >> >>>
> >> >> >>> I have some messages that don't have any particular key candidate,
> >> >> >>> except the ID,  but I don't want to use it because the idea is to
> >> >> >>> group multiple IDs in the same batch.
> >> >> >>>
> >> >> >>> This is my use case:
> >> >> >>>
> >> >> >>> I have an endpoint where I'm gonna send the message ID, this 
> >> >> >>> endpoint
> >> >> >>> is gonna return me certain information which I will use to enrich my
> >> >> >>> message. In order to avoid fetching the endpoint per message I want 
> >> >> >>> to
> >> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> >> >> >>>
> >> >> >>> - If I choose the ID as the key, my understanding is that it won't
> >> >> >>> work in the way I want (because it will form batches of the same 
> >> >> >>> ID).
> >> >> >>> - Use a constant will be a problem for parallelism, is that correct?
> >> >> >>>
> >> >> >>> Then my question is, what should I use as a key? Maybe something
> >> >> >>> regarding the timestamp? so I can have groups of messages that 
> >> >> >>> arrive
> >> >> >>> at a certain second?
> >> >> >>>
> >> >> >>> Any suggestions would be appreciated
> >> >> >>>
> >> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Interesting

I think the cache feature could be interesting for some use cases I have.

On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
>
> For the new web API IO, the page lists these features:
>
> developers provide minimal code that invokes Web API endpoint
> delegate to the transform to handle request retries and exponential backoff
> optional caching of request and response associations
> optional metrics
>
>
> On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  wrote:
>>
>> That one looks interesting
>>
>> What is not clear to me is what are the advantages of using it? Is
>> only the error/retry handling? anything in terms of performance?
>>
>> My PCollection is unbounded but I was thinking of sending my messages
>> in batches to the external API in order to gain some performance
>> (don't expect to send 1 http request per message).
>>
>> Thank you very much for all your responses!
>>
>>
>> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  wrote:
>> >
>> > To enrich your data, have you checked 
>> > https://cloud.google.com/dataflow/docs/guides/enrichment?
>> >
>> > This transform is built on top of 
>> > https://beam.apache.org/documentation/io/built-in/webapis/
>> >
>> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  
>> > wrote:
>> >>
>> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>> >> >
>> >> > Here is an example from a book that I'm reading now and it may be 
>> >> > applicable.
>> >> >
>> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> >> > PYTHON - ord(id[0]) % 100
>> >>
>> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>> >>
>> >> >
>> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
>> >> > wrote:
>> >> >>
>> >> >> How about just keeping track of a buffer and flush the buffer after 
>> >> >> 100 messages and if there is a buffer on finish_bundle as well?
>> >> >>
>> >> >>
>> >>
>> >> If this is in memory, It could lead to potential loss of data. That is
>> >> why the state is used or at least that is my understanding. but maybe
>> >> there is a way to do this in the state?
>> >>
>> >>
>> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
>> >> >> wrote:
>> >> >>>
>> >> >>> Hello guys
>> >> >>>
>> >> >>> Maybe this question was already answered, but I cannot find it  and
>> >> >>> want some more input on this topic.
>> >> >>>
>> >> >>> I have some messages that don't have any particular key candidate,
>> >> >>> except the ID,  but I don't want to use it because the idea is to
>> >> >>> group multiple IDs in the same batch.
>> >> >>>
>> >> >>> This is my use case:
>> >> >>>
>> >> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> >> >>> is gonna return me certain information which I will use to enrich my
>> >> >>> message. In order to avoid fetching the endpoint per message I want to
>> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> >> >>> supports it) . I was thinking on using GroupIntoBatches.
>> >> >>>
>> >> >>> - If I choose the ID as the key, my understanding is that it won't
>> >> >>> work in the way I want (because it will form batches of the same ID).
>> >> >>> - Use a constant will be a problem for parallelism, is that correct?
>> >> >>>
>> >> >>> Then my question is, what should I use as a key? Maybe something
>> >> >>> regarding the timestamp? so I can have groups of messages that arrive
>> >> >>> at a certain second?
>> >> >>>
>> >> >>> Any suggestions would be appreciated
>> >> >>>
>> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread XQ Hu via user
For the new web API IO, the page lists these features:

   - developers provide minimal code that invokes Web API endpoint
   - delegate to the transform to handle request retries and exponential
   backoff
   - optional caching of request and response associations
   - optional metrics


On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas 
wrote:

> That one looks interesting
>
> What is not clear to me is what are the advantages of using it? Is
> only the error/retry handling? anything in terms of performance?
>
> My PCollection is unbounded but I was thinking of sending my messages
> in batches to the external API in order to gain some performance
> (don't expect to send 1 http request per message).
>
> Thank you very much for all your responses!
>
>
> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user 
> wrote:
> >
> > To enrich your data, have you checked
> https://cloud.google.com/dataflow/docs/guides/enrichment?
> >
> > This transform is built on top of
> https://beam.apache.org/documentation/io/built-in/webapis/
> >
> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
> wrote:
> >>
> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >> >
> >> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >> >
> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> >> > PYTHON - ord(id[0]) % 100
> >>
> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> >>
> >> >
> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >> >>
> >> >> How about just keeping track of a buffer and flush the buffer after
> 100 messages and if there is a buffer on finish_bundle as well?
> >> >>
> >> >>
> >>
> >> If this is in memory, It could lead to potential loss of data. That is
> >> why the state is used or at least that is my understanding. but maybe
> >> there is a way to do this in the state?
> >>
> >>
> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >> >>>
> >> >>> Hello guys
> >> >>>
> >> >>> Maybe this question was already answered, but I cannot find it  and
> >> >>> want some more input on this topic.
> >> >>>
> >> >>> I have some messages that don't have any particular key candidate,
> >> >>> except the ID,  but I don't want to use it because the idea is to
> >> >>> group multiple IDs in the same batch.
> >> >>>
> >> >>> This is my use case:
> >> >>>
> >> >>> I have an endpoint where I'm gonna send the message ID, this
> endpoint
> >> >>> is gonna return me certain information which I will use to enrich my
> >> >>> message. In order to avoid fetching the endpoint per message I want
> to
> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> >> >>>
> >> >>> - If I choose the ID as the key, my understanding is that it won't
> >> >>> work in the way I want (because it will form batches of the same
> ID).
> >> >>> - Use a constant will be a problem for parallelism, is that correct?
> >> >>>
> >> >>> Then my question is, what should I use as a key? Maybe something
> >> >>> regarding the timestamp? so I can have groups of messages that
> arrive
> >> >>> at a certain second?
> >> >>>
> >> >>> Any suggestions would be appreciated
> >> >>>
> >> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
That one looks interesting

What is not clear to me is what are the advantages of using it? Is
only the error/retry handling? anything in terms of performance?

My PCollection is unbounded but I was thinking of sending my messages
in batches to the external API in order to gain some performance
(don't expect to send 1 http request per message).

Thank you very much for all your responses!


On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  wrote:
>
> To enrich your data, have you checked 
> https://cloud.google.com/dataflow/docs/guides/enrichment?
>
> This transform is built on top of 
> https://beam.apache.org/documentation/io/built-in/webapis/
>
> On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  wrote:
>>
>> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>> >
>> > Here is an example from a book that I'm reading now and it may be 
>> > applicable.
>> >
>> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> > PYTHON - ord(id[0]) % 100
>>
>> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>>
>> >
>> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
>> > wrote:
>> >>
>> >> How about just keeping track of a buffer and flush the buffer after 100 
>> >> messages and if there is a buffer on finish_bundle as well?
>> >>
>> >>
>>
>> If this is in memory, It could lead to potential loss of data. That is
>> why the state is used or at least that is my understanding. but maybe
>> there is a way to do this in the state?
>>
>>
>> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
>> >> wrote:
>> >>>
>> >>> Hello guys
>> >>>
>> >>> Maybe this question was already answered, but I cannot find it  and
>> >>> want some more input on this topic.
>> >>>
>> >>> I have some messages that don't have any particular key candidate,
>> >>> except the ID,  but I don't want to use it because the idea is to
>> >>> group multiple IDs in the same batch.
>> >>>
>> >>> This is my use case:
>> >>>
>> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> >>> is gonna return me certain information which I will use to enrich my
>> >>> message. In order to avoid fetching the endpoint per message I want to
>> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> >>> supports it) . I was thinking on using GroupIntoBatches.
>> >>>
>> >>> - If I choose the ID as the key, my understanding is that it won't
>> >>> work in the way I want (because it will form batches of the same ID).
>> >>> - Use a constant will be a problem for parallelism, is that correct?
>> >>>
>> >>> Then my question is, what should I use as a key? Maybe something
>> >>> regarding the timestamp? so I can have groups of messages that arrive
>> >>> at a certain second?
>> >>>
>> >>> Any suggestions would be appreciated
>> >>>
>> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-14 Thread XQ Hu via user
To enrich your data, have you checked
https://cloud.google.com/dataflow/docs/guides/enrichment?

This transform is built on top of
https://beam.apache.org/documentation/io/built-in/webapis/

On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
wrote:

> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >
> > Here is an example from a book that I'm reading now and it may be
> applicable.
> >
> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> > PYTHON - ord(id[0]) % 100
>
> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>
> >
> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
> wrote:
> >>
> >> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
> >>
> >>
>
> If this is in memory, It could lead to potential loss of data. That is
> why the state is used or at least that is my understanding. but maybe
> there is a way to do this in the state?
>
>
> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
> >>>
> >>> Hello guys
> >>>
> >>> Maybe this question was already answered, but I cannot find it  and
> >>> want some more input on this topic.
> >>>
> >>> I have some messages that don't have any particular key candidate,
> >>> except the ID,  but I don't want to use it because the idea is to
> >>> group multiple IDs in the same batch.
> >>>
> >>> This is my use case:
> >>>
> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
> >>> is gonna return me certain information which I will use to enrich my
> >>> message. In order to avoid fetching the endpoint per message I want to
> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >>> supports it) . I was thinking on using GroupIntoBatches.
> >>>
> >>> - If I choose the ID as the key, my understanding is that it won't
> >>> work in the way I want (because it will form batches of the same ID).
> >>> - Use a constant will be a problem for parallelism, is that correct?
> >>>
> >>> Then my question is, what should I use as a key? Maybe something
> >>> regarding the timestamp? so I can have groups of messages that arrive
> >>> at a certain second?
> >>>
> >>> Any suggestions would be appreciated
> >>>
> >>> Thanks.
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Ruben Vargas
On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>
> Here is an example from a book that I'm reading now and it may be applicable.
>
> JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> PYTHON - ord(id[0]) % 100

Maybe this is what I'm looking for. I'll give it a try. Thanks!

>
> On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  wrote:
>>
>> How about just keeping track of a buffer and flush the buffer after 100 
>> messages and if there is a buffer on finish_bundle as well?
>>
>>

If this is in memory, It could lead to potential loss of data. That is
why the state is used or at least that is my understanding. but maybe
there is a way to do this in the state?


>> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  wrote:
>>>
>>> Hello guys
>>>
>>> Maybe this question was already answered, but I cannot find it  and
>>> want some more input on this topic.
>>>
>>> I have some messages that don't have any particular key candidate,
>>> except the ID,  but I don't want to use it because the idea is to
>>> group multiple IDs in the same batch.
>>>
>>> This is my use case:
>>>
>>> I have an endpoint where I'm gonna send the message ID, this endpoint
>>> is gonna return me certain information which I will use to enrich my
>>> message. In order to avoid fetching the endpoint per message I want to
>>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>>> supports it) . I was thinking on using GroupIntoBatches.
>>>
>>> - If I choose the ID as the key, my understanding is that it won't
>>> work in the way I want (because it will form batches of the same ID).
>>> - Use a constant will be a problem for parallelism, is that correct?
>>>
>>> Then my question is, what should I use as a key? Maybe something
>>> regarding the timestamp? so I can have groups of messages that arrive
>>> at a certain second?
>>>
>>> Any suggestions would be appreciated
>>>
>>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Jaehyeon Kim
Here is an example from a book that I'm reading now and it may be
applicable.

JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
PYTHON - ord(id[0]) % 100

On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  wrote:

> How about just keeping track of a buffer and flush the buffer after 100
> messages and if there is a buffer on finish_bundle as well?
>
>
> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
> wrote:
>
>> Hello guys
>>
>> Maybe this question was already answered, but I cannot find it  and
>> want some more input on this topic.
>>
>> I have some messages that don't have any particular key candidate,
>> except the ID,  but I don't want to use it because the idea is to
>> group multiple IDs in the same batch.
>>
>> This is my use case:
>>
>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> is gonna return me certain information which I will use to enrich my
>> message. In order to avoid fetching the endpoint per message I want to
>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> supports it) . I was thinking on using GroupIntoBatches.
>>
>> - If I choose the ID as the key, my understanding is that it won't
>> work in the way I want (because it will form batches of the same ID).
>> - Use a constant will be a problem for parallelism, is that correct?
>>
>> Then my question is, what should I use as a key? Maybe something
>> regarding the timestamp? so I can have groups of messages that arrive
>> at a certain second?
>>
>> Any suggestions would be appreciated
>>
>> Thanks.
>>
>


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread George Dekermenjian
How about just keeping track of a buffer and flush the buffer after 100
messages and if there is a buffer on finish_bundle as well?


On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  wrote:

> Hello guys
>
> Maybe this question was already answered, but I cannot find it  and
> want some more input on this topic.
>
> I have some messages that don't have any particular key candidate,
> except the ID,  but I don't want to use it because the idea is to
> group multiple IDs in the same batch.
>
> This is my use case:
>
> I have an endpoint where I'm gonna send the message ID, this endpoint
> is gonna return me certain information which I will use to enrich my
> message. In order to avoid fetching the endpoint per message I want to
> batch it in 100 and send the 100 IDs in one request ( the endpoint
> supports it) . I was thinking on using GroupIntoBatches.
>
> - If I choose the ID as the key, my understanding is that it won't
> work in the way I want (because it will form batches of the same ID).
> - Use a constant will be a problem for parallelism, is that correct?
>
> Then my question is, what should I use as a key? Maybe something
> regarding the timestamp? so I can have groups of messages that arrive
> at a certain second?
>
> Any suggestions would be appreciated
>
> Thanks.
>


Re: Is Pulsar IO Connector Officially Supported?

2024-04-10 Thread XQ Hu via user
Sounds like a good idea to add a new section. Let me chat with the team
about that. Thanks.

On Wed, Apr 10, 2024 at 12:09 PM Ahmet Altay  wrote:

> Pulsar IO did not change much since it was originally added in 2022. You
> can find about the gaps in this presentation (
> https://2022.beamsummit.org/slides/Developing%20PulsarIO%20Connector.pdf)
> starting slide 52 (next steps). That might give you the background
> information to make an informed decision on whether it will be suitable for
> your use case or not.
>
> XQ -- For the connectors page, would it make sense to add a section for IO
> connectors that are in progress (not completely ready, but something is
> available) for visibility purposes?
>
> On Wed, Apr 10, 2024 at 8:58 AM Vince Castello via user <
> user@beam.apache.org> wrote:
>
>> I see that the Beam 2.38.0 added support for reading and writing topics.
>> Based on your response, does this mean that we should not use the Pulsar IO
>> connector for production use? Let me know.
>>
>> Thanks
>>
>> --Vince
>>
>>
>> On Wed, Apr 10, 2024 at 9:58 AM XQ Hu  wrote:
>>
>>> I think PulsarIO needs more work to be polished.
>>>
>>> On Tue, Apr 9, 2024 at 2:48 PM Vince Castello via user <
>>> user@beam.apache.org> wrote:
>>>
 I see that a Pulsar connector was made available as of BEAM 2.38.0
 release but I don't see Pulsar as an official connector on the page below.
 Is the Pulsar IO connector official or not? If official then can someone
 please update the page since it gives the idea that a Pulsar IO connector
 is not available.


 https://beam.apache.org/documentation/io/connectors/


 Thanks


 --Vince






Re: Is Pulsar IO Connector Officially Supported?

2024-04-10 Thread Ahmet Altay via user
Pulsar IO did not change much since it was originally added in 2022. You
can find about the gaps in this presentation (
https://2022.beamsummit.org/slides/Developing%20PulsarIO%20Connector.pdf)
starting slide 52 (next steps). That might give you the background
information to make an informed decision on whether it will be suitable for
your use case or not.

XQ -- For the connectors page, would it make sense to add a section for IO
connectors that are in progress (not completely ready, but something is
available) for visibility purposes?

On Wed, Apr 10, 2024 at 8:58 AM Vince Castello via user <
user@beam.apache.org> wrote:

> I see that the Beam 2.38.0 added support for reading and writing topics.
> Based on your response, does this mean that we should not use the Pulsar IO
> connector for production use? Let me know.
>
> Thanks
>
> --Vince
>
>
> On Wed, Apr 10, 2024 at 9:58 AM XQ Hu  wrote:
>
>> I think PulsarIO needs more work to be polished.
>>
>> On Tue, Apr 9, 2024 at 2:48 PM Vince Castello via user <
>> user@beam.apache.org> wrote:
>>
>>> I see that a Pulsar connector was made available as of BEAM 2.38.0
>>> release but I don't see Pulsar as an official connector on the page below.
>>> Is the Pulsar IO connector official or not? If official then can someone
>>> please update the page since it gives the idea that a Pulsar IO connector
>>> is not available.
>>>
>>>
>>> https://beam.apache.org/documentation/io/connectors/
>>>
>>>
>>> Thanks
>>>
>>>
>>> --Vince
>>>
>>>
>>>
>>>


Re: Is Pulsar IO Connector Officially Supported?

2024-04-10 Thread Vince Castello via user
I see that the Beam 2.38.0 added support for reading and writing topics.
Based on your response, does this mean that we should not use the Pulsar IO
connector for production use? Let me know.

Thanks

--Vince


On Wed, Apr 10, 2024 at 9:58 AM XQ Hu  wrote:

> I think PulsarIO needs more work to be polished.
>
> On Tue, Apr 9, 2024 at 2:48 PM Vince Castello via user <
> user@beam.apache.org> wrote:
>
>> I see that a Pulsar connector was made available as of BEAM 2.38.0
>> release but I don't see Pulsar as an official connector on the page below.
>> Is the Pulsar IO connector official or not? If official then can someone
>> please update the page since it gives the idea that a Pulsar IO connector
>> is not available.
>>
>>
>> https://beam.apache.org/documentation/io/connectors/
>>
>>
>> Thanks
>>
>>
>> --Vince
>>
>>
>>
>>


Re: Is Pulsar IO Connector Officially Supported?

2024-04-10 Thread XQ Hu via user
I think PulsarIO needs more work to be polished.

On Tue, Apr 9, 2024 at 2:48 PM Vince Castello via user 
wrote:

> I see that a Pulsar connector was made available as of BEAM 2.38.0 release
> but I don't see Pulsar as an official connector on the page below. Is the
> Pulsar IO connector official or not? If official then can someone please
> update the page since it gives the idea that a Pulsar IO connector is not
> available.
>
>
> https://beam.apache.org/documentation/io/connectors/
>
>
> Thanks
>
>
> --Vince
>
>
>
>


Re: KV with AutoValueSchema

2024-04-09 Thread Ruben Vargas
Awesome, thanks for the info! It worked like a charm!

On Thu, Apr 4, 2024 at 9:49 PM Reuven Lax  wrote:
>
> There are ways you can manually force the coder here. However I would first 
> try to split up the KV creation into two operations. Have ProcessEvents just 
> create a PCollection, and then a following operation to 
> create the KV. Something like this:
>
> input.apply(ParDo.of(New ProcessEvents()))
> .apply(WithKeys.of((SerializableFunction) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));
>
> I suspect that this will allow the mechanism to better infer the final Coder. 
> If that doesn't work, you could always brute force it like this:
>
> PCollection coreEvents = input.apply(ParDo.of(New 
> ProcessEvents()));
> coreEvents.apply(WithKeys.of((SerializableFunction) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
>  .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
>  .apply(Reshuffle.of())
>  ... etc.
>
>
> On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas  wrote:
>>
>> ProcessEvents receive as an input a Session object and créate a KV> SharedCoreEvent> as an output
>>
>> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user 
>>  escribió:
>>>
>>> There are some sharp edges unfortunately around auto-inference of KV coders 
>>> and schemas. Is there a previous PCollection of type SharedCoreEvent, or is 
>>> the SharedCoreEvent created in ProcessEvents?
>>>
>>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas  wrote:

 Hello guys

 I have a question, is it possible to use KV along with AutoValueSchema
 objects? I'm having troubles when I try to use it together.

 I have an object like the following

 @AutoValue
 @DefaultSchema(AutoValueSchema.class)
 public abstract class SharedCoreEvent {

 @JsonProperty("subscriptionId")
 public abstract String getSubscription();

 
 }

 Then I have a pipeline like the following:

  input.apply(ParDo.of(new ProcessEvents()))
 .apply(Reshuffle.of()).apply(Values.create()).apply(output);

 My input is a single object and my ProcessEvents will produce tons of
 events, in a fan-out fashion. that is why I used Reshuffle here

 But when I run this pipeline it throws the following error:

 lang.IllegalStateException: Unable to return a default Coder for
 MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
 [PCollection@2131266396]. Correct one of the following root causes:
   No Coder has been manually specified;  you may do so using .setCoder().

   Inferring a Coder from the CoderRegistry failed: Cannot provide
 coder for parameterized type
 org.apache.beam.sdk.values.KV:
 Unable to provide a Coder for events.SharedCoreEvent
   Building a Coder using a registered CoderProvider failed.


 Something similar happens with my source when I use KafkaIO and the
 source produces a KV  PCollection.

 Is there any reason for this? Maybe I'm misusing the schemas?

 Really appreciate your help

 Thanks
 Ruben


Re: How to handle Inheritance with AutoValueSchema

2024-04-09 Thread Reuven Lax via user
I don't see any unit tests for inherited AutoValue accessors, so I suspect
it simply does not work today with AutoValueSchema. This is something
that's probably fixable (though such a fix does risk breaking some users).

On Mon, Apr 8, 2024 at 11:21 PM Ruben Vargas 
wrote:

> Hello Guys
>
> I have a PCollection with a "Session" object, inside that object I
> have a list of events
>
> For each event, I have different types, EventA, EventB, EventC and so
> on.. all of them extend from Event, which will contain common fields.
>
> According to the AutoValue documentation, inheritance from another
> AutoValue class is not supported. but extend to have the fields is.
> (
> https://github.com/google/auto/blob/main/value/userguide/howto.md#-have-autovalue-also-implement-abstract-methods-from-my-supertypes
> )
>
> But when I run my pipeline, it fails with an NPE.
>
> Caused by: java.lang.NullPointerException
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:153)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:143)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
> ~[?:?]
> at
> java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
> ~[?:?]
> at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
> ~[?:?]
> at
> java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
> ~[?:?]
> at
> java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
> ~[?:?]
> at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
> ~[?:?]
> at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:144)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
> ~[?:?]
> at
> org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:138)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:93)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:145)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:130)
> ~[beam-sdks-java-core-2.55.0.jar:?]
> at
> org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
> ~[beam-sdks-java-core-2.55.0.jar:?]
>
>
> Is this not supported? or is it a Bug?  should I file an issue in GH then?
>
> Thanks
>


Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are ways you can manually force the coder here. However I would first
try to split up the KV creation into two operations. Have ProcessEvents
just create a PCollection, and then a following operation
to create the KV. Something like this:

input.apply(ParDo.of(New ProcessEvents()))
.apply(WithKeys.of((SerializableFunction)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));

I suspect that this will allow the mechanism to better infer the final
Coder. If that doesn't work, you could always brute force it like this:

PCollection coreEvents = input.apply(ParDo.of(New
ProcessEvents()));
coreEvents.apply(WithKeys.of((SerializableFunction)
ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
 .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
 .apply(Reshuffle.of())
 ... etc.


On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas  wrote:

> ProcessEvents receive as an input a Session object and créate a KV SharedCoreEvent> as an output
>
> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user <
> user@beam.apache.org> escribió:
>
>> There are some sharp edges unfortunately around auto-inference of KV
>> coders and schemas. Is there a previous PCollection of type
>> SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents?
>>
>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas 
>> wrote:
>>
>>> Hello guys
>>>
>>> I have a question, is it possible to use KV along with AutoValueSchema
>>> objects? I'm having troubles when I try to use it together.
>>>
>>> I have an object like the following
>>>
>>> @AutoValue
>>> @DefaultSchema(AutoValueSchema.class)
>>> public abstract class SharedCoreEvent {
>>>
>>> @JsonProperty("subscriptionId")
>>> public abstract String getSubscription();
>>>
>>> 
>>> }
>>>
>>> Then I have a pipeline like the following:
>>>
>>>  input.apply(ParDo.of(new ProcessEvents()))
>>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>>
>>> My input is a single object and my ProcessEvents will produce tons of
>>> events, in a fan-out fashion. that is why I used Reshuffle here
>>>
>>> But when I run this pipeline it throws the following error:
>>>
>>> lang.IllegalStateException: Unable to return a default Coder for
>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>>> [PCollection@2131266396]. Correct one of the following root causes:
>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>
>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>> coder for parameterized type
>>> org.apache.beam.sdk.values.KV:
>>> Unable to provide a Coder for events.SharedCoreEvent
>>>   Building a Coder using a registered CoderProvider failed.
>>>
>>>
>>> Something similar happens with my source when I use KafkaIO and the
>>> source produces a KV  PCollection.
>>>
>>> Is there any reason for this? Maybe I'm misusing the schemas?
>>>
>>> Really appreciate your help
>>>
>>> Thanks
>>> Ruben
>>>
>>


Re: KV with AutoValueSchema

2024-04-04 Thread Ruben Vargas
ProcessEvents receive as an input a Session object and créate a KV as an output

El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user <
user@beam.apache.org> escribió:

> There are some sharp edges unfortunately around auto-inference of KV
> coders and schemas. Is there a previous PCollection of type
> SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents?
>
> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas 
> wrote:
>
>> Hello guys
>>
>> I have a question, is it possible to use KV along with AutoValueSchema
>> objects? I'm having troubles when I try to use it together.
>>
>> I have an object like the following
>>
>> @AutoValue
>> @DefaultSchema(AutoValueSchema.class)
>> public abstract class SharedCoreEvent {
>>
>> @JsonProperty("subscriptionId")
>> public abstract String getSubscription();
>>
>> 
>> }
>>
>> Then I have a pipeline like the following:
>>
>>  input.apply(ParDo.of(new ProcessEvents()))
>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>
>> My input is a single object and my ProcessEvents will produce tons of
>> events, in a fan-out fashion. that is why I used Reshuffle here
>>
>> But when I run this pipeline it throws the following error:
>>
>> lang.IllegalStateException: Unable to return a default Coder for
>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>> [PCollection@2131266396]. Correct one of the following root causes:
>>   No Coder has been manually specified;  you may do so using .setCoder().
>>
>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>> coder for parameterized type
>> org.apache.beam.sdk.values.KV:
>> Unable to provide a Coder for events.SharedCoreEvent
>>   Building a Coder using a registered CoderProvider failed.
>>
>>
>> Something similar happens with my source when I use KafkaIO and the
>> source produces a KV  PCollection.
>>
>> Is there any reason for this? Maybe I'm misusing the schemas?
>>
>> Really appreciate your help
>>
>> Thanks
>> Ruben
>>
>


Re: KV with AutoValueSchema

2024-04-04 Thread Reuven Lax via user
There are some sharp edges unfortunately around auto-inference of KV coders
and schemas. Is there a previous PCollection of type SharedCoreEvent, or is
the SharedCoreEvent created in ProcessEvents?

On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas  wrote:

> Hello guys
>
> I have a question, is it possible to use KV along with AutoValueSchema
> objects? I'm having troubles when I try to use it together.
>
> I have an object like the following
>
> @AutoValue
> @DefaultSchema(AutoValueSchema.class)
> public abstract class SharedCoreEvent {
>
> @JsonProperty("subscriptionId")
> public abstract String getSubscription();
>
> 
> }
>
> Then I have a pipeline like the following:
>
>  input.apply(ParDo.of(new ProcessEvents()))
> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>
> My input is a single object and my ProcessEvents will produce tons of
> events, in a fan-out fashion. that is why I used Reshuffle here
>
> But when I run this pipeline it throws the following error:
>
> lang.IllegalStateException: Unable to return a default Coder for
> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
> [PCollection@2131266396]. Correct one of the following root causes:
>   No Coder has been manually specified;  you may do so using .setCoder().
>
>   Inferring a Coder from the CoderRegistry failed: Cannot provide
> coder for parameterized type
> org.apache.beam.sdk.values.KV:
> Unable to provide a Coder for events.SharedCoreEvent
>   Building a Coder using a registered CoderProvider failed.
>
>
> Something similar happens with my source when I use KafkaIO and the
> source produces a KV  PCollection.
>
> Is there any reason for this? Maybe I'm misusing the schemas?
>
> Really appreciate your help
>
> Thanks
> Ruben
>


  1   2   3   4   5   6   7   8   9   10   >