Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Sachin Mittal
Only direct runner. I have right now disabled aggregation on kpl and it looks like to be working. On Sat, 13 May 2023 at 3:35 AM, Pavel Solomin wrote: > > 100,000's of data records are accumulated and they are tried to be > pushed to Kinesis all at once > > Does that happen only in direct

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Pavel Solomin
> 100,000's of data records are accumulated and they are tried to be pushed to Kinesis all at once Does that happen only in direct runner? Or Flink runner behaves similarly? Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin

Question about metrics

2023-05-12 Thread hsy...@gmail.com
Hi I have questions about metrics. I want to use beam metrics api to send metrics to GCP monitoring. Instead of collecting just some simple numeric values. I also need to send labels along with them. Is there a way to do that? Thanks!

Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Pavel Solomin
Direct runner was meant to be test-only runner, and not to be production-use runner, and I don't know if it behaves fine with batch processing of data bulks. Do you experience the same issues when you run everything on Flink runner? Beam codebase has integration tests with Direct runner - those

Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Sachin Mittal
I am using a direct runner. If I remove the .withRate(1, Duration.standardSeconds(5) Then Kinesis IO writes to Kinesis, however it receives all the input records at once and then throws: *KPL Expiration reached while waiting in limiter* I suppose we have certain limitations with direct runner

Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Sachin Mittal
Hi, So I have prepared the write pipeline something like this: -- writePipeline .apply(GenerateSequence.from(0).to(100)) .apply(ParDo.of(new DoFn() { @ProcessElement public void

Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Pavel Solomin
Hello! > this does not seem to be generating numbers at that rate which is 1 per 5 seconds but all at one time What runner do you use? I've seen that behavior of GenerateSequence only in Direct runner. > Also looks like it may be creating an unbounded collection and looks like kinesis is not

Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Sachin Mittal
Hi, I want to emit a bounded sequence of numbers from 0 to n but downstream to receive this sequence at a given rate. This is needed so that we can rate limit the HTTP request downstream. Say if we generate sequence from 1 - 100 then downstream would make 100 such requests almost at the same