State corrupted when update

2024-07-08 Thread Ruben Vargas
Hello guys

I'm using the flink runner. I had to update my application in order to
fix some minor bugs,  (incorrect assignment on the output fields)

Then when I tried to update my pipeline with the latest code I found this error:

org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException: Error adding to state.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at 
org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate.processElement(DoFnRunnerWithMetricsUpdate.java:62)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator.processElement(DoFnOperator.java:693)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error adding to state.
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.add(FlinkStateInternals.java:760)
at 
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
Caused by: org.apache.flink.util.FlinkRuntimeException: Error while
retrieving data from RocksDB.
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:91)
at 
org.apache.beam.runners.flink.translation.wrappers.streaming.state.FlinkStateInternals$FlinkCombiningState.add(FlinkStateInternals.java:753)
at 
org.apache.beam.runners.core.SystemReduceFn.processValue(SystemReduceFn.java:119)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElement(ReduceFnRunner.java:613)
at 
org.apache.beam.runners.core.ReduceFnRunner.processElements(ReduceFnRunner.java:360)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn.processElement(GroupAlsoByWindowViaWindowSetNewDoFn.java:136)
at 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetNewDoFn$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:189)
at 
org.apache.beam.runners.core.LateDataDroppingDoFnRunner.processElement(LateDataDroppingDoFnRunner.java:80)
at org.apa…

My app was running fine and then I just updated the code. The failed
task belongs to a side input.
 (I'm using AWS Kinesis application). Is not the first issue I found,
others are sometimes related to null fields that should not be null
(and because I'm using autovalue builders, it fails to build them). So
my questions are:


- how can I prevent this to happen?
- How to deal with incompatible states in case we needed'

Currently I had to restart the application from scratch, but I'm
afraid of losing data doing that way.

Really appreciated some help (I'm using KafkaIO for read/write my data)

Regards.


How Kafka reader works

2024-07-08 Thread Ruben Vargas
Hello guys

I'm struggling to understand how the KafkaIO reader works regarding the
initial offset. I enabled *commitOffsetsInFinalize* which I understand will
commit the offset to Kafka after finishing each checkpoint.


My question is. If I'm using a stable *group.id * and for
some reason I have to restart my state (currently due some errors I'm
seeing when I update my app code)  and start the app from scratch. It will
honor the latest offset committed to the Kafka cluster? (I'm setting
auto.commit = false and auto.offset.reset  = latest)

Reading the code here:
https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L679


it seems like it's getting the position (not the committed) which is
certainly different
https://stackoverflow.com/questions/47543771/kafkaconsumer-position-vs-committed


And I see that commitSync is called here:
https://github.com/apache/beam/blob/de4645d45073004b3b7d196de7ddf40ad6429eb0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L600

So I don't really understand where the resume from committed offset will
happen, but maybe I'm misunderstanding something.

Regards.


Re: Exactly once KafkaIO with flink runner

2024-06-24 Thread Ruben Vargas
On Mon, Jun 24, 2024 at 2:02 AM Jan Lukavský  wrote:
>
> Hi,
>
> the distribution of keys to workers might not be uniform, when the
> number of keys is comparable to total parallelism. General advise would be:
>
>   a) try to increase number of keys (EOS parallelism in this case) to be
> at least several times higher than parallelism

Make sense, unfortunately I faced an error when I tried to put the
shards > partitions. :(

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat

Do I need to move any configuration to do that?

Thanks

>
>   b) increase maxParallelism (default 128, maximum 32768), as it might
> influence the assignment of keys to downstream workers
>
> Best,
>
>   Jan
>
> On 6/21/24 05:25, Ruben Vargas wrote:
> > Image as not correctly attached. sending it again. Sorry
> >
> > Thanks
> >
> > On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  
> > wrote:
> >> Hello guys, me again
> >>
> >> I was trying to debug the issue with the  backpressure and I noticed
> >> that even if I set the shards = 16, not all tasks are receiving
> >> messages (attaching screenshot). You know potential causes and
> >> solutions?
> >>
> >> I really appreciate any help you can provide
> >>
> >>
> >> Thank you very much!
> >>
> >> Regards.
> >>
> >>
> >> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  
> >> wrote:
> >>> Hello again
> >>>
> >>> Thank you for all the suggestions.
> >>>
> >>> Unfortunately if I put more shards than partitions it throws me this 
> >>> exception
> >>>
> >>> "message": 
> >>> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> >>> ids -> ToGBKResult ->
> >>> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> >>> to Kafka topic 
> >>> 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> >>> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> >>> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> >>> java.lang.RuntimeException:
> >>> java.lang.reflect.InvocationTargetException\n\tat
> >>> ..
> >>> ..
> >>> ..
> >>> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> >>> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> >>> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> >>> 6ms while awaiting AddOffsetsToTxn\n",
> >>>
> >>>
> >>> Any other alternative? Thank you very much!
> >>>
> >>> Regards
> >>>
> >>> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
> >>>> Hi,
> >>>>
> >>>> regarding aligned vs unaligned checkpoints I recommend reading [1], it
> >>>> explains it quite well. Generally, I would prefer unaligned checkpoints
> >>>> in this case.
> >>>>
> >>>> Another thing to consider is the number of shards of the EOS sink.
> >>>> Because how the shards are distributed among workers, it might be good
> >>>> idea to actually increase that to some number higher than number of
> >>>> target partitions (e.g. targetPartitions * 10 or so). Additional thing
> >>>> to consider is increasing maxParallelism of the pipeline (e.g. max value
> >>>> is 32768), as it also affects how 'evenly' Flink assigns shards to
> >>>> workers. You can check if the assignment is even using counters in the
> >>>> sink operator(s).
> >>>>
> >>>>Jan
> >>>>
> >>>> [1]
> >>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> >>>>
> >>>> On 6/19/24 05:15, Ruben Vargas wrote:
> >>>>> Hello guys
> >>>>>

Re: Exactly once KafkaIO with flink runner

2024-06-20 Thread Ruben Vargas
Image as not correctly attached. sending it again. Sorry

Thanks

On Thu, Jun 20, 2024 at 9:25 PM Ruben Vargas  wrote:
>
> Hello guys, me again
>
> I was trying to debug the issue with the  backpressure and I noticed
> that even if I set the shards = 16, not all tasks are receiving
> messages (attaching screenshot). You know potential causes and
> solutions?
>
> I really appreciate any help you can provide
>
>
> Thank you very much!
>
> Regards.
>
>
> On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:
> >
> > Hello again
> >
> > Thank you for all the suggestions.
> >
> > Unfortunately if I put more shards than partitions it throws me this 
> > exception
> >
> > "message": 
> > "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> > ids -> ToGBKResult ->
> > PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> > to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> > (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> > FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> > java.lang.RuntimeException:
> > java.lang.reflect.InvocationTargetException\n\tat
> > ..
> > ..
> > ..
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> > java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> > org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> > 6ms while awaiting AddOffsetsToTxn\n",
> >
> >
> > Any other alternative? Thank you very much!
> >
> > Regards
> >
> > On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
> > >
> > > Hi,
> > >
> > > regarding aligned vs unaligned checkpoints I recommend reading [1], it
> > > explains it quite well. Generally, I would prefer unaligned checkpoints
> > > in this case.
> > >
> > > Another thing to consider is the number of shards of the EOS sink.
> > > Because how the shards are distributed among workers, it might be good
> > > idea to actually increase that to some number higher than number of
> > > target partitions (e.g. targetPartitions * 10 or so). Additional thing
> > > to consider is increasing maxParallelism of the pipeline (e.g. max value
> > > is 32768), as it also affects how 'evenly' Flink assigns shards to
> > > workers. You can check if the assignment is even using counters in the
> > > sink operator(s).
> > >
> > >   Jan
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> > >
> > > On 6/19/24 05:15, Ruben Vargas wrote:
> > > > Hello guys
> > > >
> > > > Now I was able to pass that error.
> > > >
> > > > I had to set the consumer factory function
> > > > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> > > >
> > > > This is because my cluster uses SASL authentication mechanism, and the
> > > > small consumer created to fetch the topics metadata was throwing that
> > > > error.
> > > >
> > > > There are other couple things I noticed:
> > > >
> > > >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > > > cluster and even with that the back pressure is high . Any advice on
> > > > this? I already increased the shards to equal the number of partitions
> > > > of the destination topic.
> > > >
> > > > - I have an error where
> > > > "State exists for shard mytopic-0, but there is no state stored with
> > > > Kafka topic mytopic' group id myconsumergroup'
> > > >
> > > > The only way I found to recover from this error is to change the group
> > > > name. Any other advice on how to recover from this error?
> > > >
> > > >
> > > > Thank you very much for following this up!
> > > >
> > > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  
> > > > wrote:
> > > >> Hello Jan
> > > >>
> > > >> Thanks for the suggestions
> > > >>
> > > >> Any benefit of using aligned vs unaligned?
> > > >>
> > > >>
> > > >> At the end I found one problem that was preventing  flink from doing
> > &g

Re: Exactly once KafkaIO with flink runner

2024-06-20 Thread Ruben Vargas
Hello guys, me again

I was trying to debug the issue with the  backpressure and I noticed
that even if I set the shards = 16, not all tasks are receiving
messages (attaching screenshot). You know potential causes and
solutions?

I really appreciate any help you can provide


Thank you very much!

Regards.


On Wed, Jun 19, 2024 at 11:09 PM Ruben Vargas  wrote:
>
> Hello again
>
> Thank you for all the suggestions.
>
> Unfortunately if I put more shards than partitions it throws me this exception
>
> "message": 
> "PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
> ids -> ToGBKResult ->
> PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
> to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
> (4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
> FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
> java.lang.RuntimeException:
> java.lang.reflect.InvocationTargetException\n\tat
> ..
> ..
> ..
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
> java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
> org.apache.kafka.common.errors.TimeoutException: Timeout expired after
> 6ms while awaiting AddOffsetsToTxn\n",
>
>
> Any other alternative? Thank you very much!
>
> Regards
>
> On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
> >
> > Hi,
> >
> > regarding aligned vs unaligned checkpoints I recommend reading [1], it
> > explains it quite well. Generally, I would prefer unaligned checkpoints
> > in this case.
> >
> > Another thing to consider is the number of shards of the EOS sink.
> > Because how the shards are distributed among workers, it might be good
> > idea to actually increase that to some number higher than number of
> > target partitions (e.g. targetPartitions * 10 or so). Additional thing
> > to consider is increasing maxParallelism of the pipeline (e.g. max value
> > is 32768), as it also affects how 'evenly' Flink assigns shards to
> > workers. You can check if the assignment is even using counters in the
> > sink operator(s).
> >
> >   Jan
> >
> > [1]
> > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
> >
> > On 6/19/24 05:15, Ruben Vargas wrote:
> > > Hello guys
> > >
> > > Now I was able to pass that error.
> > >
> > > I had to set the consumer factory function
> > > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> > >
> > > This is because my cluster uses SASL authentication mechanism, and the
> > > small consumer created to fetch the topics metadata was throwing that
> > > error.
> > >
> > > There are other couple things I noticed:
> > >
> > >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > > cluster and even with that the back pressure is high . Any advice on
> > > this? I already increased the shards to equal the number of partitions
> > > of the destination topic.
> > >
> > > - I have an error where
> > > "State exists for shard mytopic-0, but there is no state stored with
> > > Kafka topic mytopic' group id myconsumergroup'
> > >
> > > The only way I found to recover from this error is to change the group
> > > name. Any other advice on how to recover from this error?
> > >
> > >
> > > Thank you very much for following this up!
> > >
> > > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  
> > > wrote:
> > >> Hello Jan
> > >>
> > >> Thanks for the suggestions
> > >>
> > >> Any benefit of using aligned vs unaligned?
> > >>
> > >>
> > >> At the end I found one problem that was preventing  flink from doing
> > >> the checkpointing. It was a DoFn function that has some "non
> > >> serializable" objects, so I made those transient and initialized those
> > >> on the setup.
> > >>
> > >> Weird, because I usually was able to detect these kinds of errors just
> > >> running in the direct runner, or even in flink before enabling EOS.
> > >>
> > >>
> > >> Now I'm facing another weird issue
> > >>
> > >> org.apache.beam.sdk.util.UserCodeException:
> > >> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> > >> expired b

Re: Paralalelism of a side input

2024-06-20 Thread Ruben Vargas
Only bad thing for this approach is, at least in the flink runner it
consume a task slot :(

El El mié, 12 de jun de 2024 a la(s) 9:38 a.m., Robert Bradshaw <
rober...@google.com> escribió:

> On Wed, Jun 12, 2024 at 7:56 AM Ruben Vargas 
> wrote:
> >
> > The approach looks good. but one question
> >
> > My understanding is that this will schedule for example 8 operators
> across the workers, but only one of them will be processing, the others
> remain idle? Are those consuming resources in some way? I'm assuming may be
> is not significant.
>
> That is correct, but the resources consumed by an idle operator should
> be negligible.
>
> > Thanks.
> >
> > El El vie, 7 de jun de 2024 a la(s) 3:56 p.m., Robert Bradshaw via user <
> user@beam.apache.org> escribió:
> >>
> >> You can always limit the parallelism by assigning a single key to
> >> every element and then doing a grouping or reshuffle[1] on that key
> >> before processing the elements. Even if the operator parallelism for
> >> that step is technically, say, eight, your effective parallelism will
> >> be exactly one.
> >>
> >> [1]
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html
> >>
> >> On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas 
> wrote:
> >> >
> >> > Hello guys
> >> >
> >> > One question, I have a side input which fetches an endpoint each 30
> >> > min, I pretty much copied the example here:
> >> > https://beam.apache.org/documentation/patterns/side-inputs/ but added
> >> > some logic to fetch the endpoint and parse the payload.
> >> >
> >> > My question is: it is possible to control the parallelism of this
> >> > single ParDo that does the fetch/transform? I don't think I need a lot
> >> > of parallelism for that one. I'm currently using flink runner and I
> >> > see the parallelism is 8 (which is the general parallelism for my
> >> > flink cluster).
> >> >
> >> > Is it possible to set it to 1 for example?
> >> >
> >> >
> >> > Regards.
>


Re: Exactly once KafkaIO with flink runner

2024-06-19 Thread Ruben Vargas
Hello again

Thank you for all the suggestions.

Unfortunately if I put more shards than partitions it throws me this exception

"message": 
"PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Persist
ids -> ToGBKResult ->
PipelineBuilder-debug-output/KafkaIO.Write/KafkaIO.WriteRecords/KafkaExactlyOnceSink/Write
to Kafka topic 'behavioral-signals-log-stream'/ParMultiDo(ExactlyOnceWriter)
(4/8)#0 (76ed5be34c202de19384b829f09d6346) switched from RUNNING to
FAILED with failure cause: org.apache.beam.sdk.util.UserCodeException:
java.lang.RuntimeException:
java.lang.reflect.InvocationTargetException\n\tat
..
..
..
org.apache.flink.runtime.taskmanager.Task.run(Task.java:568)\n\tat
java.base/java.lang.Thread.run(Thread.java:829)\nCaused by:
org.apache.kafka.common.errors.TimeoutException: Timeout expired after
6ms while awaiting AddOffsetsToTxn\n",


Any other alternative? Thank you very much!

Regards

On Wed, Jun 19, 2024 at 1:00 AM Jan Lukavský  wrote:
>
> Hi,
>
> regarding aligned vs unaligned checkpoints I recommend reading [1], it
> explains it quite well. Generally, I would prefer unaligned checkpoints
> in this case.
>
> Another thing to consider is the number of shards of the EOS sink.
> Because how the shards are distributed among workers, it might be good
> idea to actually increase that to some number higher than number of
> target partitions (e.g. targetPartitions * 10 or so). Additional thing
> to consider is increasing maxParallelism of the pipeline (e.g. max value
> is 32768), as it also affects how 'evenly' Flink assigns shards to
> workers. You can check if the assignment is even using counters in the
> sink operator(s).
>
>   Jan
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/checkpointing_under_backpressure/
>
> On 6/19/24 05:15, Ruben Vargas wrote:
> > Hello guys
> >
> > Now I was able to pass that error.
> >
> > I had to set the consumer factory function
> > .withConsumerFactoryFn(new KafkaConsumerFactory(config))
> >
> > This is because my cluster uses SASL authentication mechanism, and the
> > small consumer created to fetch the topics metadata was throwing that
> > error.
> >
> > There are other couple things I noticed:
> >
> >   - Now I have a lot of backpressure, I assigned x3 resources to the
> > cluster and even with that the back pressure is high . Any advice on
> > this? I already increased the shards to equal the number of partitions
> > of the destination topic.
> >
> > - I have an error where
> > "State exists for shard mytopic-0, but there is no state stored with
> > Kafka topic mytopic' group id myconsumergroup'
> >
> > The only way I found to recover from this error is to change the group
> > name. Any other advice on how to recover from this error?
> >
> >
> > Thank you very much for following this up!
> >
> > On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  
> > wrote:
> >> Hello Jan
> >>
> >> Thanks for the suggestions
> >>
> >> Any benefit of using aligned vs unaligned?
> >>
> >>
> >> At the end I found one problem that was preventing  flink from doing
> >> the checkpointing. It was a DoFn function that has some "non
> >> serializable" objects, so I made those transient and initialized those
> >> on the setup.
> >>
> >> Weird, because I usually was able to detect these kinds of errors just
> >> running in the direct runner, or even in flink before enabling EOS.
> >>
> >>
> >> Now I'm facing another weird issue
> >>
> >> org.apache.beam.sdk.util.UserCodeException:
> >> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> >> expired before the last committed offset for partitions
> >> [behavioral-signals-6] could be determined. Try tuning
> >> default.api.timeout.ms larger to relax the threshold.
> >> at 
> >> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> >> at 
> >> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> >> Source)
> >> at 
> >> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
> >>
> >> I tried to extend the timeout and it didn't work, my shards are equal
> >> to my number of partitions.
> >>
> >> I appreciate any kind of guidance
> >>
> >> Thanks.
> >>
> >> On Tue, Jun 18

Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
Hello guys

Now I was able to pass that error.

I had to set the consumer factory function
.withConsumerFactoryFn(new KafkaConsumerFactory(config))

This is because my cluster uses SASL authentication mechanism, and the
small consumer created to fetch the topics metadata was throwing that
error.

There are other couple things I noticed:

 - Now I have a lot of backpressure, I assigned x3 resources to the
cluster and even with that the back pressure is high . Any advice on
this? I already increased the shards to equal the number of partitions
of the destination topic.

- I have an error where
"State exists for shard mytopic-0, but there is no state stored with
Kafka topic mytopic' group id myconsumergroup'

The only way I found to recover from this error is to change the group
name. Any other advice on how to recover from this error?


Thank you very much for following this up!

On Tue, Jun 18, 2024 at 8:44 AM Ruben Vargas  wrote:
>
> Hello Jan
>
> Thanks for the suggestions
>
> Any benefit of using aligned vs unaligned?
>
>
> At the end I found one problem that was preventing  flink from doing
> the checkpointing. It was a DoFn function that has some "non
> serializable" objects, so I made those transient and initialized those
> on the setup.
>
> Weird, because I usually was able to detect these kinds of errors just
> running in the direct runner, or even in flink before enabling EOS.
>
>
> Now I'm facing another weird issue
>
> org.apache.beam.sdk.util.UserCodeException:
> org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
> expired before the last committed offset for partitions
> [behavioral-signals-6] could be determined. Try tuning
> default.api.timeout.ms larger to relax the threshold.
> at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
> at 
> org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
> Source)
> at 
> org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)
>
> I tried to extend the timeout and it didn't work, my shards are equal
> to my number of partitions.
>
> I appreciate any kind of guidance
>
> Thanks.
>
> On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:
> >
> > I'd suggest:
> >  a) use unaligned checkpoints, if possible
> >
> >  b) verify the number of buckets you use for EOS sink, this limits 
> > parallelism [1].
> >
> > Best,
> >
> >  Jan
> >
> > [1] 
> > https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
> >
> > On 6/18/24 09:32, Ruben Vargas wrote:
> >
> > Hello Lukavsky
> >
> > Thanks for your reply !
> >
> > I thought was due backpreassure but i increased the resources of the 
> > cluster and problem still presist. More that that, data stop flowing and 
> > the checkpoint still fail.
> >
> > I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
> > aligned checkpoint.
> >
> > El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
> >  escribió:
> >>
> >> H Ruben,
> >>
> >> from the provided screenshot it seems to me, that the pipeline in
> >> backpressured by the sink. Can you please share your checkpoint
> >> configuration? Are you using unaligned checkpoints? What is the
> >> checkpointing interval and the volume of data coming in from the source?
> >> With EOS data is committed after checkpoint, before that, the data is
> >> buffered in state, which makes the sink more resource intensive.
> >>
> >>   Jan
> >>
> >> On 6/18/24 05:30, Ruben Vargas wrote:
> >> > Attached a better image of the console.
> >> >
> >> > Thanks!
> >> >
> >> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  
> >> > wrote:
> >> >> Hello guys
> >> >>
> >> >> Wondering if some of you have experiences enabling Exactly Once in
> >> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >> >> where all the checkpoints are failing. I cannot see any exception on
> >> >> the logs.
> >> >>
> >> >> Flink console only mentions this "Asynchronous task checkpoint
> >> >> failed." I also noticed that some operators don't acknowledge the
> >> >> checkpointing  (Attached a screenshot).
> >> >>
> >> >> I did this:
> >> >>
> >> >> 1) KafkaIO.Read:
> >> >>
> >> >> update consumer properties with enable.auto.commit = false
> >> >> .withReadCommitted()
> >> >> .commitOffsetsInFinalize()
> >> >>
> >> >> 2) KafkaIO#write:
> >> >>
> >> >> .withEOS(numShards, sinkGroupId)
> >> >>
> >> >> But my application is not able to deliver messages to the output topic
> >> >> due the checkpoint failing.
> >> >> I also reviewed the timeout and other time sensitive parameters, those
> >> >> are high right now.
> >> >>
> >> >> I really appreciate your guidance on this. Thank you


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
Hello Jan

Thanks for the suggestions

Any benefit of using aligned vs unaligned?


At the end I found one problem that was preventing  flink from doing
the checkpointing. It was a DoFn function that has some "non
serializable" objects, so I made those transient and initialized those
on the setup.

Weird, because I usually was able to detect these kinds of errors just
running in the direct runner, or even in flink before enabling EOS.


Now I'm facing another weird issue

org.apache.beam.sdk.util.UserCodeException:
org.apache.kafka.common.errors.TimeoutException: Timeout of 6ms
expired before the last committed offset for partitions
[behavioral-signals-6] could be determined. Try tuning
default.api.timeout.ms larger to relax the threshold.
at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
at 
org.apache.beam.sdk.io.kafka.KafkaExactlyOnceSink$ExactlyOnceWriter$DoFnInvoker.invokeProcessElement(Unknown
Source)
at 
org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:212)

I tried to extend the timeout and it didn't work, my shards are equal
to my number of partitions.

I appreciate any kind of guidance

Thanks.

On Tue, Jun 18, 2024 at 5:56 AM Jan Lukavský  wrote:
>
> I'd suggest:
>  a) use unaligned checkpoints, if possible
>
>  b) verify the number of buckets you use for EOS sink, this limits 
> parallelism [1].
>
> Best,
>
>  Jan
>
> [1] 
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.WriteRecords.html#withEOS-int-java.lang.String-
>
> On 6/18/24 09:32, Ruben Vargas wrote:
>
> Hello Lukavsky
>
> Thanks for your reply !
>
> I thought was due backpreassure but i increased the resources of the cluster 
> and problem still presist. More that that, data stop flowing and the 
> checkpoint still fail.
>
> I have configured the checkpoint to do it per minute. The timeout is 1h. Is 
> aligned checkpoint.
>
> El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský 
>  escribió:
>>
>> H Ruben,
>>
>> from the provided screenshot it seems to me, that the pipeline in
>> backpressured by the sink. Can you please share your checkpoint
>> configuration? Are you using unaligned checkpoints? What is the
>> checkpointing interval and the volume of data coming in from the source?
>> With EOS data is committed after checkpoint, before that, the data is
>> buffered in state, which makes the sink more resource intensive.
>>
>>   Jan
>>
>> On 6/18/24 05:30, Ruben Vargas wrote:
>> > Attached a better image of the console.
>> >
>> > Thanks!
>> >
>> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas  
>> > wrote:
>> >> Hello guys
>> >>
>> >> Wondering if some of you have experiences enabling Exactly Once in
>> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
>> >> where all the checkpoints are failing. I cannot see any exception on
>> >> the logs.
>> >>
>> >> Flink console only mentions this "Asynchronous task checkpoint
>> >> failed." I also noticed that some operators don't acknowledge the
>> >> checkpointing  (Attached a screenshot).
>> >>
>> >> I did this:
>> >>
>> >> 1) KafkaIO.Read:
>> >>
>> >> update consumer properties with enable.auto.commit = false
>> >> .withReadCommitted()
>> >> .commitOffsetsInFinalize()
>> >>
>> >> 2) KafkaIO#write:
>> >>
>> >> .withEOS(numShards, sinkGroupId)
>> >>
>> >> But my application is not able to deliver messages to the output topic
>> >> due the checkpoint failing.
>> >> I also reviewed the timeout and other time sensitive parameters, those
>> >> are high right now.
>> >>
>> >> I really appreciate your guidance on this. Thank you


Re: Exactly once KafkaIO with flink runner

2024-06-18 Thread Ruben Vargas
Hello Lukavsky

Thanks for your reply !

I thought was due backpreassure but i increased the resources of the
cluster and problem still presist. More that that, data stop flowing and
the checkpoint still fail.

I have configured the checkpoint to do it per minute. The timeout is 1h. Is
aligned checkpoint.

El El mar, 18 de jun de 2024 a la(s) 1:14 a.m., Jan Lukavský <
je...@seznam.cz> escribió:

> H Ruben,
>
> from the provided screenshot it seems to me, that the pipeline in
> backpressured by the sink. Can you please share your checkpoint
> configuration? Are you using unaligned checkpoints? What is the
> checkpointing interval and the volume of data coming in from the source?
> With EOS data is committed after checkpoint, before that, the data is
> buffered in state, which makes the sink more resource intensive.
>
>   Jan
>
> On 6/18/24 05:30, Ruben Vargas wrote:
> > Attached a better image of the console.
> >
> > Thanks!
> >
> > On Mon, Jun 17, 2024 at 9:28 PM Ruben Vargas 
> wrote:
> >> Hello guys
> >>
> >> Wondering if some of you have experiences enabling Exactly Once in
> >> KafkaIO with Flink runner? I enabled it and now I'm facing an issue
> >> where all the checkpoints are failing. I cannot see any exception on
> >> the logs.
> >>
> >> Flink console only mentions this "Asynchronous task checkpoint
> >> failed." I also noticed that some operators don't acknowledge the
> >> checkpointing  (Attached a screenshot).
> >>
> >> I did this:
> >>
> >> 1) KafkaIO.Read:
> >>
> >> update consumer properties with enable.auto.commit = false
> >> .withReadCommitted()
> >> .commitOffsetsInFinalize()
> >>
> >> 2) KafkaIO#write:
> >>
> >> .withEOS(numShards, sinkGroupId)
> >>
> >> But my application is not able to deliver messages to the output topic
> >> due the checkpoint failing.
> >> I also reviewed the timeout and other time sensitive parameters, those
> >> are high right now.
> >>
> >> I really appreciate your guidance on this. Thank you
>


Re: How windowing is implemented on Flink runner

2024-06-12 Thread Ruben Vargas
I imagined it but wasn't sure!

Thanks for the clarification!

On Wed, Jun 12, 2024 at 1:42 PM Robert Bradshaw via user
 wrote:
>
> Beam implements Windowing itself (via state and timers) rather than
> deferring to Flink's implementation.
>
> On Wed, Jun 12, 2024 at 11:55 AM Ruben Vargas  wrote:
> >
> > Hello guys
> >
> > May be a silly question,
> >
> > But in the Flink runner, the window implementation uses the Flink
> > windowing? Does that mean the runner will have performance issues like
> > Flink itself? see this:
> > https://issues.apache.org/jira/browse/FLINK-7001
> >
> > I'm asking because I see the issue, it mentions different concepts
> > that Beam already handles at the API level. So my suspicion is that
> > the Beam model handles windowing a little differently from the pure
> > Flink app. But I'm not sure..
> >
> >
> > Regards.


How windowing is implemented on Flink runner

2024-06-12 Thread Ruben Vargas
Hello guys

May be a silly question,

But in the Flink runner, the window implementation uses the Flink
windowing? Does that mean the runner will have performance issues like
Flink itself? see this:
https://issues.apache.org/jira/browse/FLINK-7001

I'm asking because I see the issue, it mentions different concepts
that Beam already handles at the API level. So my suspicion is that
the Beam model handles windowing a little differently from the pure
Flink app. But I'm not sure..


Regards.


Re: Paralalelism of a side input

2024-06-12 Thread Ruben Vargas
The approach looks good. but one question

My understanding is that this will schedule for example 8 operators across
the workers, but only one of them will be processing, the others
remain idle? Are those consuming resources in some way? I'm assuming may be
is not significant.

Thanks.

El El vie, 7 de jun de 2024 a la(s) 3:56 p.m., Robert Bradshaw via user <
user@beam.apache.org> escribió:

> You can always limit the parallelism by assigning a single key to
> every element and then doing a grouping or reshuffle[1] on that key
> before processing the elements. Even if the operator parallelism for
> that step is technically, say, eight, your effective parallelism will
> be exactly one.
>
> [1]
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/Reshuffle.html
>
> On Fri, Jun 7, 2024 at 2:13 PM Ruben Vargas 
> wrote:
> >
> > Hello guys
> >
> > One question, I have a side input which fetches an endpoint each 30
> > min, I pretty much copied the example here:
> > https://beam.apache.org/documentation/patterns/side-inputs/ but added
> > some logic to fetch the endpoint and parse the payload.
> >
> > My question is: it is possible to control the parallelism of this
> > single ParDo that does the fetch/transform? I don't think I need a lot
> > of parallelism for that one. I'm currently using flink runner and I
> > see the parallelism is 8 (which is the general parallelism for my
> > flink cluster).
> >
> > Is it possible to set it to 1 for example?
> >
> >
> > Regards.
>


Paralalelism of a side input

2024-06-07 Thread Ruben Vargas
Hello guys

One question, I have a side input which fetches an endpoint each 30
min, I pretty much copied the example here:
https://beam.apache.org/documentation/patterns/side-inputs/ but added
some logic to fetch the endpoint and parse the payload.

My question is: it is possible to control the parallelism of this
single ParDo that does the fetch/transform? I don't think I need a lot
of parallelism for that one. I'm currently using flink runner and I
see the parallelism is 8 (which is the general parallelism for my
flink cluster).

Is it possible to set it to 1 for example?


Regards.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Yeah  unfortunately the data on the endpoint could change at any point
in time and I need to make sure to have the latest one :/

That limits my options here. But I also have other sources that can
benefit from this caching :)

Thank you very much!

On Mon, Apr 15, 2024 at 9:37 AM XQ Hu  wrote:
>
> I am not sure you still need to do batching since Web API can handle caching.
>
> If you really need it, I think GoupIntoBatches is a good way to go.
>
> On Mon, Apr 15, 2024 at 11:30 AM Ruben Vargas  wrote:
>>
>> Is there a way to do batching in that transformation? I'm assuming for
>> now no. or may be using in conjuntion with GoupIntoBatches
>>
>> On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas  wrote:
>> >
>> > Interesting
>> >
>> > I think the cache feature could be interesting for some use cases I have.
>> >
>> > On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
>> > >
>> > > For the new web API IO, the page lists these features:
>> > >
>> > > developers provide minimal code that invokes Web API endpoint
>> > > delegate to the transform to handle request retries and exponential 
>> > > backoff
>> > > optional caching of request and response associations
>> > > optional metrics
>> > >
>> > >
>> > > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  
>> > > wrote:
>> > >>
>> > >> That one looks interesting
>> > >>
>> > >> What is not clear to me is what are the advantages of using it? Is
>> > >> only the error/retry handling? anything in terms of performance?
>> > >>
>> > >> My PCollection is unbounded but I was thinking of sending my messages
>> > >> in batches to the external API in order to gain some performance
>> > >> (don't expect to send 1 http request per message).
>> > >>
>> > >> Thank you very much for all your responses!
>> > >>
>> > >>
>> > >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  
>> > >> wrote:
>> > >> >
>> > >> > To enrich your data, have you checked 
>> > >> > https://cloud.google.com/dataflow/docs/guides/enrichment?
>> > >> >
>> > >> > This transform is built on top of 
>> > >> > https://beam.apache.org/documentation/io/built-in/webapis/
>> > >> >
>> > >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas 
>> > >> >  wrote:
>> > >> >>
>> > >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  
>> > >> >> wrote:
>> > >> >> >
>> > >> >> > Here is an example from a book that I'm reading now and it may be 
>> > >> >> > applicable.
>> > >> >> >
>> > >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> > >> >> > PYTHON - ord(id[0]) % 100
>> > >> >>
>> > >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>> > >> >>
>> > >> >> >
>> > >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian 
>> > >> >> >  wrote:
>> > >> >> >>
>> > >> >> >> How about just keeping track of a buffer and flush the buffer 
>> > >> >> >> after 100 messages and if there is a buffer on finish_bundle as 
>> > >> >> >> well?
>> > >> >> >>
>> > >> >> >>
>> > >> >>
>> > >> >> If this is in memory, It could lead to potential loss of data. That 
>> > >> >> is
>> > >> >> why the state is used or at least that is my understanding. but maybe
>> > >> >> there is a way to do this in the state?
>> > >> >>
>> > >> >>
>> > >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas 
>> > >> >> >>  wrote:
>> > >> >> >>>
>> > >> >> >>> Hello guys
>> > >> >> >>>
>> > >> >> >>> Maybe this question was already answered, but I cannot find it  
>> > >> >> >>> and
>> > >> >> >>> want some more input on this topic.
>> > >> >> >>>
>> > >> >> >>> I have some messages that don't have any particular key 
>> > >> >> >>> candidate,
>> > >> >> >>> except the ID,  but I don't want to use it because the idea is to
>> > >> >> >>> group multiple IDs in the same batch.
>> > >> >> >>>
>> > >> >> >>> This is my use case:
>> > >> >> >>>
>> > >> >> >>> I have an endpoint where I'm gonna send the message ID, this 
>> > >> >> >>> endpoint
>> > >> >> >>> is gonna return me certain information which I will use to 
>> > >> >> >>> enrich my
>> > >> >> >>> message. In order to avoid fetching the endpoint per message I 
>> > >> >> >>> want to
>> > >> >> >>> batch it in 100 and send the 100 IDs in one request ( the 
>> > >> >> >>> endpoint
>> > >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
>> > >> >> >>>
>> > >> >> >>> - If I choose the ID as the key, my understanding is that it 
>> > >> >> >>> won't
>> > >> >> >>> work in the way I want (because it will form batches of the same 
>> > >> >> >>> ID).
>> > >> >> >>> - Use a constant will be a problem for parallelism, is that 
>> > >> >> >>> correct?
>> > >> >> >>>
>> > >> >> >>> Then my question is, what should I use as a key? Maybe something
>> > >> >> >>> regarding the timestamp? so I can have groups of messages that 
>> > >> >> >>> arrive
>> > >> >> >>> at a certain second?
>> > >> >> >>>
>> > >> >> >>> Any suggestions would be appreciated
>> > >> >> >>>
>> > >> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Is there a way to do batching in that transformation? I'm assuming for
now no. or may be using in conjuntion with GoupIntoBatches

On Mon, Apr 15, 2024 at 9:29 AM Ruben Vargas  wrote:
>
> Interesting
>
> I think the cache feature could be interesting for some use cases I have.
>
> On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
> >
> > For the new web API IO, the page lists these features:
> >
> > developers provide minimal code that invokes Web API endpoint
> > delegate to the transform to handle request retries and exponential backoff
> > optional caching of request and response associations
> > optional metrics
> >
> >
> > On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  
> > wrote:
> >>
> >> That one looks interesting
> >>
> >> What is not clear to me is what are the advantages of using it? Is
> >> only the error/retry handling? anything in terms of performance?
> >>
> >> My PCollection is unbounded but I was thinking of sending my messages
> >> in batches to the external API in order to gain some performance
> >> (don't expect to send 1 http request per message).
> >>
> >> Thank you very much for all your responses!
> >>
> >>
> >> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  
> >> wrote:
> >> >
> >> > To enrich your data, have you checked 
> >> > https://cloud.google.com/dataflow/docs/guides/enrichment?
> >> >
> >> > This transform is built on top of 
> >> > https://beam.apache.org/documentation/io/built-in/webapis/
> >> >
> >> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  
> >> > wrote:
> >> >>
> >> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
> >> >> >
> >> >> > Here is an example from a book that I'm reading now and it may be 
> >> >> > applicable.
> >> >> >
> >> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> >> >> > PYTHON - ord(id[0]) % 100
> >> >>
> >> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
> >> >>
> >> >> >
> >> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
> >> >> > wrote:
> >> >> >>
> >> >> >> How about just keeping track of a buffer and flush the buffer after 
> >> >> >> 100 messages and if there is a buffer on finish_bundle as well?
> >> >> >>
> >> >> >>
> >> >>
> >> >> If this is in memory, It could lead to potential loss of data. That is
> >> >> why the state is used or at least that is my understanding. but maybe
> >> >> there is a way to do this in the state?
> >> >>
> >> >>
> >> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
> >> >> >> wrote:
> >> >> >>>
> >> >> >>> Hello guys
> >> >> >>>
> >> >> >>> Maybe this question was already answered, but I cannot find it  and
> >> >> >>> want some more input on this topic.
> >> >> >>>
> >> >> >>> I have some messages that don't have any particular key candidate,
> >> >> >>> except the ID,  but I don't want to use it because the idea is to
> >> >> >>> group multiple IDs in the same batch.
> >> >> >>>
> >> >> >>> This is my use case:
> >> >> >>>
> >> >> >>> I have an endpoint where I'm gonna send the message ID, this 
> >> >> >>> endpoint
> >> >> >>> is gonna return me certain information which I will use to enrich my
> >> >> >>> message. In order to avoid fetching the endpoint per message I want 
> >> >> >>> to
> >> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
> >> >> >>> supports it) . I was thinking on using GroupIntoBatches.
> >> >> >>>
> >> >> >>> - If I choose the ID as the key, my understanding is that it won't
> >> >> >>> work in the way I want (because it will form batches of the same 
> >> >> >>> ID).
> >> >> >>> - Use a constant will be a problem for parallelism, is that correct?
> >> >> >>>
> >> >> >>> Then my question is, what should I use as a key? Maybe something
> >> >> >>> regarding the timestamp? so I can have groups of messages that 
> >> >> >>> arrive
> >> >> >>> at a certain second?
> >> >> >>>
> >> >> >>> Any suggestions would be appreciated
> >> >> >>>
> >> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
Interesting

I think the cache feature could be interesting for some use cases I have.

On Mon, Apr 15, 2024 at 9:18 AM XQ Hu  wrote:
>
> For the new web API IO, the page lists these features:
>
> developers provide minimal code that invokes Web API endpoint
> delegate to the transform to handle request retries and exponential backoff
> optional caching of request and response associations
> optional metrics
>
>
> On Mon, Apr 15, 2024 at 10:38 AM Ruben Vargas  wrote:
>>
>> That one looks interesting
>>
>> What is not clear to me is what are the advantages of using it? Is
>> only the error/retry handling? anything in terms of performance?
>>
>> My PCollection is unbounded but I was thinking of sending my messages
>> in batches to the external API in order to gain some performance
>> (don't expect to send 1 http request per message).
>>
>> Thank you very much for all your responses!
>>
>>
>> On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  wrote:
>> >
>> > To enrich your data, have you checked 
>> > https://cloud.google.com/dataflow/docs/guides/enrichment?
>> >
>> > This transform is built on top of 
>> > https://beam.apache.org/documentation/io/built-in/webapis/
>> >
>> > On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  
>> > wrote:
>> >>
>> >> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>> >> >
>> >> > Here is an example from a book that I'm reading now and it may be 
>> >> > applicable.
>> >> >
>> >> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> >> > PYTHON - ord(id[0]) % 100
>> >>
>> >> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>> >>
>> >> >
>> >> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
>> >> > wrote:
>> >> >>
>> >> >> How about just keeping track of a buffer and flush the buffer after 
>> >> >> 100 messages and if there is a buffer on finish_bundle as well?
>> >> >>
>> >> >>
>> >>
>> >> If this is in memory, It could lead to potential loss of data. That is
>> >> why the state is used or at least that is my understanding. but maybe
>> >> there is a way to do this in the state?
>> >>
>> >>
>> >> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
>> >> >> wrote:
>> >> >>>
>> >> >>> Hello guys
>> >> >>>
>> >> >>> Maybe this question was already answered, but I cannot find it  and
>> >> >>> want some more input on this topic.
>> >> >>>
>> >> >>> I have some messages that don't have any particular key candidate,
>> >> >>> except the ID,  but I don't want to use it because the idea is to
>> >> >>> group multiple IDs in the same batch.
>> >> >>>
>> >> >>> This is my use case:
>> >> >>>
>> >> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> >> >>> is gonna return me certain information which I will use to enrich my
>> >> >>> message. In order to avoid fetching the endpoint per message I want to
>> >> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> >> >>> supports it) . I was thinking on using GroupIntoBatches.
>> >> >>>
>> >> >>> - If I choose the ID as the key, my understanding is that it won't
>> >> >>> work in the way I want (because it will form batches of the same ID).
>> >> >>> - Use a constant will be a problem for parallelism, is that correct?
>> >> >>>
>> >> >>> Then my question is, what should I use as a key? Maybe something
>> >> >>> regarding the timestamp? so I can have groups of messages that arrive
>> >> >>> at a certain second?
>> >> >>>
>> >> >>> Any suggestions would be appreciated
>> >> >>>
>> >> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-15 Thread Ruben Vargas
That one looks interesting

What is not clear to me is what are the advantages of using it? Is
only the error/retry handling? anything in terms of performance?

My PCollection is unbounded but I was thinking of sending my messages
in batches to the external API in order to gain some performance
(don't expect to send 1 http request per message).

Thank you very much for all your responses!


On Sun, Apr 14, 2024 at 8:28 AM XQ Hu via user  wrote:
>
> To enrich your data, have you checked 
> https://cloud.google.com/dataflow/docs/guides/enrichment?
>
> This transform is built on top of 
> https://beam.apache.org/documentation/io/built-in/webapis/
>
> On Fri, Apr 12, 2024 at 4:38 PM Ruben Vargas  wrote:
>>
>> On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>> >
>> > Here is an example from a book that I'm reading now and it may be 
>> > applicable.
>> >
>> > JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
>> > PYTHON - ord(id[0]) % 100
>>
>> Maybe this is what I'm looking for. I'll give it a try. Thanks!
>>
>> >
>> > On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  
>> > wrote:
>> >>
>> >> How about just keeping track of a buffer and flush the buffer after 100 
>> >> messages and if there is a buffer on finish_bundle as well?
>> >>
>> >>
>>
>> If this is in memory, It could lead to potential loss of data. That is
>> why the state is used or at least that is my understanding. but maybe
>> there is a way to do this in the state?
>>
>>
>> >> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  
>> >> wrote:
>> >>>
>> >>> Hello guys
>> >>>
>> >>> Maybe this question was already answered, but I cannot find it  and
>> >>> want some more input on this topic.
>> >>>
>> >>> I have some messages that don't have any particular key candidate,
>> >>> except the ID,  but I don't want to use it because the idea is to
>> >>> group multiple IDs in the same batch.
>> >>>
>> >>> This is my use case:
>> >>>
>> >>> I have an endpoint where I'm gonna send the message ID, this endpoint
>> >>> is gonna return me certain information which I will use to enrich my
>> >>> message. In order to avoid fetching the endpoint per message I want to
>> >>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>> >>> supports it) . I was thinking on using GroupIntoBatches.
>> >>>
>> >>> - If I choose the ID as the key, my understanding is that it won't
>> >>> work in the way I want (because it will form batches of the same ID).
>> >>> - Use a constant will be a problem for parallelism, is that correct?
>> >>>
>> >>> Then my question is, what should I use as a key? Maybe something
>> >>> regarding the timestamp? so I can have groups of messages that arrive
>> >>> at a certain second?
>> >>>
>> >>> Any suggestions would be appreciated
>> >>>
>> >>> Thanks.


Re: Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Ruben Vargas
On Fri, Apr 12, 2024 at 2:17 PM Jaehyeon Kim  wrote:
>
> Here is an example from a book that I'm reading now and it may be applicable.
>
> JAVA - (id.hashCode() & Integer.MAX_VALUE) % 100
> PYTHON - ord(id[0]) % 100

Maybe this is what I'm looking for. I'll give it a try. Thanks!

>
> On Sat, 13 Apr 2024 at 06:12, George Dekermenjian  wrote:
>>
>> How about just keeping track of a buffer and flush the buffer after 100 
>> messages and if there is a buffer on finish_bundle as well?
>>
>>

If this is in memory, It could lead to potential loss of data. That is
why the state is used or at least that is my understanding. but maybe
there is a way to do this in the state?


>> On Fri, Apr 12, 2024 at 21.23 Ruben Vargas  wrote:
>>>
>>> Hello guys
>>>
>>> Maybe this question was already answered, but I cannot find it  and
>>> want some more input on this topic.
>>>
>>> I have some messages that don't have any particular key candidate,
>>> except the ID,  but I don't want to use it because the idea is to
>>> group multiple IDs in the same batch.
>>>
>>> This is my use case:
>>>
>>> I have an endpoint where I'm gonna send the message ID, this endpoint
>>> is gonna return me certain information which I will use to enrich my
>>> message. In order to avoid fetching the endpoint per message I want to
>>> batch it in 100 and send the 100 IDs in one request ( the endpoint
>>> supports it) . I was thinking on using GroupIntoBatches.
>>>
>>> - If I choose the ID as the key, my understanding is that it won't
>>> work in the way I want (because it will form batches of the same ID).
>>> - Use a constant will be a problem for parallelism, is that correct?
>>>
>>> Then my question is, what should I use as a key? Maybe something
>>> regarding the timestamp? so I can have groups of messages that arrive
>>> at a certain second?
>>>
>>> Any suggestions would be appreciated
>>>
>>> Thanks.


Any recomendation for key for GroupIntoBatches

2024-04-12 Thread Ruben Vargas
Hello guys

Maybe this question was already answered, but I cannot find it  and
want some more input on this topic.

I have some messages that don't have any particular key candidate,
except the ID,  but I don't want to use it because the idea is to
group multiple IDs in the same batch.

This is my use case:

I have an endpoint where I'm gonna send the message ID, this endpoint
is gonna return me certain information which I will use to enrich my
message. In order to avoid fetching the endpoint per message I want to
batch it in 100 and send the 100 IDs in one request ( the endpoint
supports it) . I was thinking on using GroupIntoBatches.

- If I choose the ID as the key, my understanding is that it won't
work in the way I want (because it will form batches of the same ID).
- Use a constant will be a problem for parallelism, is that correct?

Then my question is, what should I use as a key? Maybe something
regarding the timestamp? so I can have groups of messages that arrive
at a certain second?

Any suggestions would be appreciated

Thanks.


Re: KV with AutoValueSchema

2024-04-09 Thread Ruben Vargas
Awesome, thanks for the info! It worked like a charm!

On Thu, Apr 4, 2024 at 9:49 PM Reuven Lax  wrote:
>
> There are ways you can manually force the coder here. However I would first 
> try to split up the KV creation into two operations. Have ProcessEvents just 
> create a PCollection, and then a following operation to 
> create the KV. Something like this:
>
> input.apply(ParDo.of(New ProcessEvents()))
> .apply(WithKeys.of((SerializableFunction) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()));
>
> I suspect that this will allow the mechanism to better infer the final Coder. 
> If that doesn't work, you could always brute force it like this:
>
> PCollection coreEvents = input.apply(ParDo.of(New 
> ProcessEvents()));
> coreEvents.apply(WithKeys.of((SerializableFunction) 
> ExtractKeyFunction).withKeyType(TypeDescriptors.longs()))
>  .setCoder(KvCoder.of(LongCoder.of(), coreEvents.getCoder()))
>  .apply(Reshuffle.of())
>      ... etc.
>
>
> On Thu, Apr 4, 2024 at 8:19 PM Ruben Vargas  wrote:
>>
>> ProcessEvents receive as an input a Session object and créate a KV> SharedCoreEvent> as an output
>>
>> El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user 
>>  escribió:
>>>
>>> There are some sharp edges unfortunately around auto-inference of KV coders 
>>> and schemas. Is there a previous PCollection of type SharedCoreEvent, or is 
>>> the SharedCoreEvent created in ProcessEvents?
>>>
>>> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas  wrote:
>>>>
>>>> Hello guys
>>>>
>>>> I have a question, is it possible to use KV along with AutoValueSchema
>>>> objects? I'm having troubles when I try to use it together.
>>>>
>>>> I have an object like the following
>>>>
>>>> @AutoValue
>>>> @DefaultSchema(AutoValueSchema.class)
>>>> public abstract class SharedCoreEvent {
>>>>
>>>> @JsonProperty("subscriptionId")
>>>> public abstract String getSubscription();
>>>>
>>>> 
>>>> }
>>>>
>>>> Then I have a pipeline like the following:
>>>>
>>>>  input.apply(ParDo.of(new ProcessEvents()))
>>>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>>>
>>>> My input is a single object and my ProcessEvents will produce tons of
>>>> events, in a fan-out fashion. that is why I used Reshuffle here
>>>>
>>>> But when I run this pipeline it throws the following error:
>>>>
>>>> lang.IllegalStateException: Unable to return a default Coder for
>>>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>>>> [PCollection@2131266396]. Correct one of the following root causes:
>>>>   No Coder has been manually specified;  you may do so using .setCoder().
>>>>
>>>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>>>> coder for parameterized type
>>>> org.apache.beam.sdk.values.KV:
>>>> Unable to provide a Coder for events.SharedCoreEvent
>>>>   Building a Coder using a registered CoderProvider failed.
>>>>
>>>>
>>>> Something similar happens with my source when I use KafkaIO and the
>>>> source produces a KV  PCollection.
>>>>
>>>> Is there any reason for this? Maybe I'm misusing the schemas?
>>>>
>>>> Really appreciate your help
>>>>
>>>> Thanks
>>>> Ruben


How to handle Inheritance with AutoValueSchema

2024-04-08 Thread Ruben Vargas
Hello Guys

I have a PCollection with a "Session" object, inside that object I
have a list of events

For each event, I have different types, EventA, EventB, EventC and so
on.. all of them extend from Event, which will contain common fields.

According to the AutoValue documentation, inheritance from another
AutoValue class is not supported. but extend to have the fields is.
(https://github.com/google/auto/blob/main/value/userguide/howto.md#-have-autovalue-also-implement-abstract-methods-from-my-supertypes)

But when I run my pipeline, it fails with an NPE.

Caused by: java.lang.NullPointerException
at 
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:153)
~[beam-sdks-java-core-2.55.0.jar:?]
at 
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:143)
~[beam-sdks-java-core-2.55.0.jar:?]
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
~[?:?]
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
~[?:?]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484) ~[?:?]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
~[?:?]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
~[?:?]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:?]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578) ~[?:?]
at 
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:144)
~[beam-sdks-java-core-2.55.0.jar:?]
at 
java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
~[?:?]
at 
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:138)
~[beam-sdks-java-core-2.55.0.jar:?]
at 
org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:93)
~[beam-sdks-java-core-2.55.0.jar:?]
at 
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:145)
~[beam-sdks-java-core-2.55.0.jar:?]
at 
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$RowValueGettersFactory.create(GetterBasedSchemaProvider.java:130)
~[beam-sdks-java-core-2.55.0.jar:?]
at org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
~[beam-sdks-java-core-2.55.0.jar:?]


Is this not supported? or is it a Bug?  should I file an issue in GH then?

Thanks


Re: KV with AutoValueSchema

2024-04-04 Thread Ruben Vargas
ProcessEvents receive as an input a Session object and créate a KV as an output

El El jue, 4 de abr de 2024 a la(s) 8:52 p.m., Reuven Lax via user <
user@beam.apache.org> escribió:

> There are some sharp edges unfortunately around auto-inference of KV
> coders and schemas. Is there a previous PCollection of type
> SharedCoreEvent, or is the SharedCoreEvent created in ProcessEvents?
>
> On Thu, Apr 4, 2024 at 2:12 PM Ruben Vargas 
> wrote:
>
>> Hello guys
>>
>> I have a question, is it possible to use KV along with AutoValueSchema
>> objects? I'm having troubles when I try to use it together.
>>
>> I have an object like the following
>>
>> @AutoValue
>> @DefaultSchema(AutoValueSchema.class)
>> public abstract class SharedCoreEvent {
>>
>> @JsonProperty("subscriptionId")
>> public abstract String getSubscription();
>>
>> 
>> }
>>
>> Then I have a pipeline like the following:
>>
>>  input.apply(ParDo.of(new ProcessEvents()))
>> .apply(Reshuffle.of()).apply(Values.create()).apply(output);
>>
>> My input is a single object and my ProcessEvents will produce tons of
>> events, in a fan-out fashion. that is why I used Reshuffle here
>>
>> But when I run this pipeline it throws the following error:
>>
>> lang.IllegalStateException: Unable to return a default Coder for
>> MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
>> [PCollection@2131266396]. Correct one of the following root causes:
>>   No Coder has been manually specified;  you may do so using .setCoder().
>>
>>   Inferring a Coder from the CoderRegistry failed: Cannot provide
>> coder for parameterized type
>> org.apache.beam.sdk.values.KV:
>> Unable to provide a Coder for events.SharedCoreEvent
>>   Building a Coder using a registered CoderProvider failed.
>>
>>
>> Something similar happens with my source when I use KafkaIO and the
>> source produces a KV  PCollection.
>>
>> Is there any reason for this? Maybe I'm misusing the schemas?
>>
>> Really appreciate your help
>>
>> Thanks
>> Ruben
>>
>


KV with AutoValueSchema

2024-04-04 Thread Ruben Vargas
Hello guys

I have a question, is it possible to use KV along with AutoValueSchema
objects? I'm having troubles when I try to use it together.

I have an object like the following

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public abstract class SharedCoreEvent {

@JsonProperty("subscriptionId")
public abstract String getSubscription();


}

Then I have a pipeline like the following:

 input.apply(ParDo.of(new ProcessEvents()))
.apply(Reshuffle.of()).apply(Values.create()).apply(output);

My input is a single object and my ProcessEvents will produce tons of
events, in a fan-out fashion. that is why I used Reshuffle here

But when I run this pipeline it throws the following error:

lang.IllegalStateException: Unable to return a default Coder for
MCEvents/ParDo(ProcessEvents)/ParMultiDo(ProcessEvents).output
[PCollection@2131266396]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().

  Inferring a Coder from the CoderRegistry failed: Cannot provide
coder for parameterized type
org.apache.beam.sdk.values.KV:
Unable to provide a Coder for events.SharedCoreEvent
  Building a Coder using a registered CoderProvider failed.


Something similar happens with my source when I use KafkaIO and the
source produces a KV  PCollection.

Is there any reason for this? Maybe I'm misusing the schemas?

Really appreciate your help

Thanks
Ruben


DLQ Implementation

2024-03-27 Thread Ruben Vargas
Hello all

Maybe a silly question. Are there any  suggestions for implementing a DLQ
in my beam pipeline?

Currently I'm using this library https://github.com/tosun-si/asgarde which
is not bad, the only issue I found  is that sometimes it is hard  to use
with GroupIntoBatches or other native transformations.

Then I saw this PR https://github.com/apache/beam/pull/29164/files which I
think is some sort of DLQ support? but I cannot find something on the docs
for that and I am not 100% familiar with all the beam code to understand
very well.

Appreciated your help


Thank you
-Ruben


Re: [Question] Side Input pattern

2023-09-15 Thread Ruben Vargas
Hello thanks for the reply

I was digging into the UnboundedReader interface, and I observed that some
implementations block the entire progress of the other inputs when they get
blocked into the advance() method, (probably waiting if there are new
elements or not), an example of this is the AWS SQSIO  implementation. if I
return true or false immediately the progress of the main input continues,
but If I wait for results on the advance() method, all of other inputs get
blocked


Is that assumption correct?



El El vie, 15 de septiembre de 2023 a la(s) 10:59, Robert Bradshaw via user
 escribió:

> Beam will block on side inputs until at least one value is available (or
> the watermark has advanced such that we can be sure one will never become
> available, which doesn't really apply to the global window case).
> After that, workers generally cache the side input value (for performance
> reasons) but may periodically re-fetch it (the exact cadence probably
> depends on the runner implementation).
>
> On Tue, Sep 12, 2023 at 10:34 PM Ruben Vargas 
> wrote:
>
>> Hello Everyone
>>
>> I have a question, I have on my pipeline one side input that fetches some
>> configurations from an API endpoint each 30 seconds, my question is this.
>>
>>
>> I have something similar to what is showed in the side input patterns
>> documentation
>>
>>  PCollectionView> map =
>> p.apply(GenerateSequence.from(0).withRate(1,
>> Duration.standardSeconds(5L)))
>> .apply(
>> ParDo.of(
>> new DoFn>() {
>>
>>   @ProcessElement
>>   public void process(
>>   @Element Long input,
>>   @Timestamp Instant timestamp,
>>   OutputReceiver> o) {
>> call HTTP endpoint here!!
>>   }
>> }))
>> .apply(
>> Window.>into(new GlobalWindows())
>>
>> .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
>> .discardingFiredPanes())
>> .apply(Latest.globally())
>> .apply(View.asSingleton());
>>
>> What happens if for example the HTTP endpoint takes time to respond due
>> some network issues and/or the amount of data. Is this gonna introduce
>> delays on my main pipeline? Is the main pipeline blocked until the pardo
>> that processes the side input ends?
>>
>> I don't care too much about the consistency here, I mean if the
>> configuration changed in the Time T1 I don't care if some registries with
>> T2 timestamp are processed with the configuration version of T1.
>>
>>
>> Regards.
>>
>>


[Question] Side Input pattern

2023-09-12 Thread Ruben Vargas
Hello Everyone

I have a question, I have on my pipeline one side input that fetches some
configurations from an API endpoint each 30 seconds, my question is this.


I have something similar to what is showed in the side input patterns
documentation

 PCollectionView> map =
p.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(5L)))
.apply(
ParDo.of(
new DoFn>() {

  @ProcessElement
  public void process(
  @Element Long input,
  @Timestamp Instant timestamp,
  OutputReceiver> o) {
call HTTP endpoint here!!
  }
}))
.apply(
Window.>into(new GlobalWindows())

.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(Latest.globally())
.apply(View.asSingleton());

What happens if for example the HTTP endpoint takes time to respond due
some network issues and/or the amount of data. Is this gonna introduce
delays on my main pipeline? Is the main pipeline blocked until the pardo
that processes the side input ends?

I don't care too much about the consistency here, I mean if the
configuration changed in the Time T1 I don't care if some registries with
T2 timestamp are processed with the configuration version of T1.


Regards.


Re: Issue with growing state/checkpoint size

2023-09-01 Thread Ruben Vargas
Ohh I see

That makes sense. Wondering if there is an strategy for my use case, where
I have an ID unique per pair of messages

Thanks for all your help!

On Fri, Sep 1, 2023 at 6:51 AM Sachin Mittal  wrote:

> Yes a very high and non deterministic cardinality can make the stored
> state of join operation unbounded.
> In my case we know the cardinality and it was not very high so we could go
> with a lookup based approach using redis to enrich the stream and avoid
> joins.
>
>
>
> On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas 
> wrote:
>
>> Thanks for the reply and the advice
>>
>> One more thing, Do you know if the key-space carnality impacts on this?
>> I'm assuming it is, but the thing is for my case all the messages from the
>> sources has a unique ID, that makes my key-space huge and is not on my
>> control .
>>
>> On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal  wrote:
>>
>>> So for the smaller size of collection which does not grow with size for
>>> certain keys we stored the data in redis and instead of beam join in our
>>> DoFn we just did the lookup and got the data we need.
>>>
>>>
>>> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas 
>>> wrote:
>>>
>>>> Hello,
>>>>
>>>> Thanks for the reply, Any strategy you followed to avoid joins when you
>>>> rewrite your pipeline?
>>>>
>>>>
>>>>
>>>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal 
>>>> wrote:
>>>>
>>>>> Yes even we faced the same issue when trying to run a pipeline
>>>>> involving join of two collections. It was deployed using AWS KDA, which
>>>>> uses flink runner. The source was kinesis streams.
>>>>>
>>>>> Looks like join operations are not very efficient in terms of size
>>>>> management when run on flink.
>>>>>
>>>>> We had to rewrite our pipeline to avoid these joins.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas 
>>>>> wrote:
>>>>>
>>>>>> Hello
>>>>>>
>>>>>> I experimenting an issue with my beam pipeline
>>>>>>
>>>>>> I have a pipeline in which I split the work into different branches,
>>>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>>>
>>>>>> For the Join, I used a Session Window, and discarding the messages
>>>>>> after trigger.
>>>>>>
>>>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>>>> in Flink console, the  following task has the largest checkpoint
>>>>>>
>>>>>> join_results/GBK -> ToGBKResult ->
>>>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>>>
>>>>>>
>>>>>> Any Advice ?
>>>>>>
>>>>>> Thank you very much!
>>>>>>
>>>>>>


Re: Issue with growing state/checkpoint size

2023-08-29 Thread Ruben Vargas
Thanks for the reply and the advice

One more thing, Do you know if the key-space carnality impacts on this? I'm
assuming it is, but the thing is for my case all the messages from the
sources has a unique ID, that makes my key-space huge and is not on my
control .

On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal  wrote:

> So for the smaller size of collection which does not grow with size for
> certain keys we stored the data in redis and instead of beam join in our
> DoFn we just did the lookup and got the data we need.
>
>
> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas 
> wrote:
>
>> Hello,
>>
>> Thanks for the reply, Any strategy you followed to avoid joins when you
>> rewrite your pipeline?
>>
>>
>>
>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal  wrote:
>>
>>> Yes even we faced the same issue when trying to run a pipeline involving
>>> join of two collections. It was deployed using AWS KDA, which uses flink
>>> runner. The source was kinesis streams.
>>>
>>> Looks like join operations are not very efficient in terms of size
>>> management when run on flink.
>>>
>>> We had to rewrite our pipeline to avoid these joins.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas 
>>> wrote:
>>>
>>>> Hello
>>>>
>>>> I experimenting an issue with my beam pipeline
>>>>
>>>> I have a pipeline in which I split the work into different branches,
>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>
>>>> For the Join, I used a Session Window, and discarding the messages
>>>> after trigger.
>>>>
>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>> in Flink console, the  following task has the largest checkpoint
>>>>
>>>> join_results/GBK -> ToGBKResult ->
>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>
>>>>
>>>> Any Advice ?
>>>>
>>>> Thank you very much!
>>>>
>>>>


Re: Issue with growing state/checkpoint size

2023-08-29 Thread Ruben Vargas
Hello,

Thanks for the reply, Any strategy you followed to avoid joins when you
rewrite your pipeline?



On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal  wrote:

> Yes even we faced the same issue when trying to run a pipeline involving
> join of two collections. It was deployed using AWS KDA, which uses flink
> runner. The source was kinesis streams.
>
> Looks like join operations are not very efficient in terms of size
> management when run on flink.
>
> We had to rewrite our pipeline to avoid these joins.
>
> Thanks
> Sachin
>
>
> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas 
> wrote:
>
>> Hello
>>
>> I experimenting an issue with my beam pipeline
>>
>> I have a pipeline in which I split the work into different branches, then
>> I do a join using CoGroupByKey, each message has its own unique Key.
>>
>> For the Join, I used a Session Window, and discarding the messages after
>> trigger.
>>
>> I'm using Flink Runner and deployed a KInesis application. But I'm
>> experiencing  an unbounded growth of the checkpoint data size. When I see
>> in Flink console, the  following task has the largest checkpoint
>>
>> join_results/GBK -> ToGBKResult ->
>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>
>>
>> Any Advice ?
>>
>> Thank you very much!
>>
>>


Issue with growing state/checkpoint size

2023-08-29 Thread Ruben Vargas
Hello

I experimenting an issue with my beam pipeline

I have a pipeline in which I split the work into different branches, then I
do a join using CoGroupByKey, each message has its own unique Key.

For the Join, I used a Session Window, and discarding the messages after
trigger.

I'm using Flink Runner and deployed a KInesis application. But I'm
experiencing  an unbounded growth of the checkpoint data size. When I see
in Flink console, the  following task has the largest checkpoint

join_results/GBK -> ToGBKResult ->
join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V


Any Advice ?

Thank you very much!


Re: [question]Best practices for branching pipeline.

2023-08-10 Thread Ruben Vargas
Hello, Thank you very much for the reply

I was thinking on branching because I have some heavy processes that I
would like to distribute to other workers, and scale independently of the
other less heavier processes

Does that make sense?

On Wed, Aug 9, 2023 at 12:16 PM John Casey via user 
wrote:

> Depending on the specifics of your processing, it may be simpler to just
> do both transforms within a single pardo.
>
> i.e.
>
> pipeline.apply(kafka.read())
> .apply(ParDo.of(new UserTransform());
>
> public static class UserTransform extends DoFn{
>
> @ProcessElement
> public void processElement(@Element KafkaRecord record,
> OutputReciever receiver) {
>   Type1 part1 = something(record);
>   Type2 part2 = somethingElse(record;
>   MergedType merged = merge(part1, part2);
>  receiver.output(merged)
> }
>
> }
>
>
>
> On Wed, Jul 26, 2023 at 11:43 PM Ruben Vargas 
> wrote:
>
>>
>> Hello?
>>
>> Any advice on how to do what I described? I can only found examples of
>> bounded data. Not for streaming.
>>
>>
>>
>> Aldo can I get invited to slack?
>>
>> Thank you very much!
>>
>> El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas <
>> ruben.var...@metova.com> escribió:
>>
>>> Hello,
>>>
>>> I'm starting using Beam and I would like to know if there is any
>>> recommended pattern for doing the following:
>>>
>>> I have a message coming from Kafka and then I would like to apply two
>>> different transformations and merge them in a single result at the end. I
>>> attached an image that describes the pipeline.
>>>
>>> Each message has its own unique key,
>>>
>>> What I'm doing is using a Session Window with a trigger
>>> elementCountAtLeast with the number equal to the number of process I
>>> expected to generate results (in the case of the diagram will be 2)
>>>
>>> This is the code fragment I used for construct the window:
>>>
>>> Window> joinWindow = Window.>> OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(
>>>
>>> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
>>> ).discardingFiredPanes().withAllowedLateness(Duration.ZERO);
>>>
>>>
>>> and then a CoGroupKey to join all of the results. Is this a
>>> recommended approach? Or  is there a recommended way? What happens if at
>>> some points I have a lot of windows "open"?
>>>
>>>
>>> Thank you very much!
>>>
>>>


Re: [question]Best practices for branching pipeline.

2023-07-26 Thread Ruben Vargas
Hello?

Any advice on how to do what I described? I can only found examples of
bounded data. Not for streaming.



Aldo can I get invited to slack?

Thank you very much!

El El vie, 21 de julio de 2023 a la(s) 9:34, Ruben Vargas <
ruben.var...@metova.com> escribió:

> Hello,
>
> I'm starting using Beam and I would like to know if there is any
> recommended pattern for doing the following:
>
> I have a message coming from Kafka and then I would like to apply two
> different transformations and merge them in a single result at the end. I
> attached an image that describes the pipeline.
>
> Each message has its own unique key,
>
> What I'm doing is using a Session Window with a trigger
> elementCountAtLeast with the number equal to the number of process I
> expected to generate results (in the case of the diagram will be 2)
>
> This is the code fragment I used for construct the window:
>
> Window> joinWindow = Window. OUTPUT>>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(
>
> Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
> ).discardingFiredPanes().withAllowedLateness(Duration.ZERO);
>
>
> and then a CoGroupKey to join all of the results. Is this a
> recommended approach? Or  is there a recommended way? What happens if at
> some points I have a lot of windows "open"?
>
>
> Thank you very much!
>
>


[question]Best practices for branching pipeline.

2023-07-21 Thread Ruben Vargas
Hello,

I'm starting using Beam and I would like to know if there is any
recommended pattern for doing the following:

I have a message coming from Kafka and then I would like to apply two
different transformations and merge them in a single result at the end. I
attached an image that describes the pipeline.

Each message has its own unique key,

What I'm doing is using a Session Window with a trigger elementCountAtLeast
with the number equal to the number of process I expected to generate
results (in the case of the diagram will be 2)

This is the code fragment I used for construct the window:

Window> joinWindow = Window.>into(Sessions.withGapDuration(Duration.standardSeconds(60))).triggering(

Repeatedly.forever(AfterPane.elementCountAtLeast(nProcessWait))
).discardingFiredPanes().withAllowedLateness(Duration.ZERO);


and then a CoGroupKey to join all of the results. Is this a
recommended approach? Or  is there a recommended way? What happens if at
some points I have a lot of windows "open"?


Thank you very much!