> 100,000's of data records are accumulated and they are tried to be pushed
to Kinesis all at once

Does that happen only in direct runner? Or Flink runner behaves similarly?

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Fri, 12 May 2023 at 16:43, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> So I have prepared the write pipeline something like this:
>
>
> --------------------------------------------------------------------------------------------------------------
> writePipeline
> .apply(GenerateSequence.from(0).to(100))
> .apply(ParDo.of(new DoFn<Long, byte[]>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> long i = c.element();
> // Fetching data for step=i
> List<> data = fetchForInputStep(i);
> // output all the data one by one
> for (Data d : data) {
> out.output(d.asBytes());
> }
> }
> }))
> .apply(KinesisIO.write()
> .withStreamName(streamName)
> // other configs
> );
>
> writePipeline.run().waitUntilFinish()
>
> What I observe is that pipeline part to push data to kinesis is only
> happening after the entire data is loaded by a second apply function.
> So what happens is that 100,000's of data records are accumulated and they
> are tried to be pushed to Kinesis all at once and we get following error:
> *KPL Expiration reached while waiting in limiter*
>
> The logs are generated like this:
>
> --------------------------------------------------------------------------------------------------------------
> Extracting binaries to
> /var/folders/30/knyj9z4d3psbd4s6kffqc5000000gn/T/amazon-kinesis-producer-native-binaries
> .........
> [main.cc:384] Starting up main producer
> .........
> [main.cc:395] Entering join
> .........
> Fetching data for step=1
> .........
> Fetching data for step=100
> .........
> [kinesis_producer.cc:200] Created pipeline for stream "xxxxxx"
> [shard_map.cc:87] Updating shard map for stream "xxxxxx"
> [shard_map.cc:148] Successfully updated shard map for stream "xxxxxx"
> found 1 shards
> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
> 'xxxxxx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
> UserRecords: 742018, KinesisRecords: 4698 }
>
>
> I had assumed that as soon as step 1 data was fetched it would pass the
> data downstream and
> the kinesis pipeline would have been created much before and would have
> started writing to Kinesis much earlier, but this is happening only after
> all the data is collected.
>
> Is there a way to fix this ?
>
> Thanks
> Sachin
>
>
>
> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin <p.o.solo...@gmail.com>
> wrote:
>
>> > two pipeline objects in my application
>>
>> I think this should work. I meant to have 2 separate artifacts and deploy
>> them separately, but if your app runs batch processing with 2 sequential
>> steps, 2 pipelines should work too:
>>
>> - writePipeline.run().waitUntilFinish()
>> - readAndWritePipeline.run().waitUntilFinish()
>>
>> Best Regards,
>> Pavel Solomin
>>
>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>> <https://www.linkedin.com/in/pavelsolomin>
>>
>>
>>
>>
>>
>> On Wed, 10 May 2023 at 11:49, Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>>> Use case is something like this:
>>> A source writes source data to kinesis and same is used to compute
>>> derived data which is again written back to same stream so next level of
>>> derived data can be computed from previous derived data and so on.
>>>
>>> Would there be any issues from beam side to do the same within a single
>>> pipeline?
>>>
>>> When you say I have to split my app into two do you mean that I have to
>>> create two pipeline objects in my application?
>>>
>>> If so then how will application end?
>>>
>>> Note that source is of finite size which gets written into kinesis.
>>>
>>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>>> limitations in achieving what we want then please let me know.
>>>
>>> Thanks
>>>
>>>
>>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin <p.o.solo...@gmail.com>
>>> wrote:
>>>
>>>> Hello!
>>>>
>>>> I've never seen use-cases where it would be necessary. What are you
>>>> trying to achieve? Some context would be helpful.
>>>> Your example looks like you can split your app into two - one writes
>>>> into streamName and the others read from streamName.
>>>>
>>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>>> is not maintained anymore. Better to use this instead:
>>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>>
>>>> Best Regards,
>>>> Pavel Solomin
>>>>
>>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>>> <https://www.linkedin.com/in/pavelsolomin>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>>
>>>>>
>>>>> My pipeline is something like this: (*note the kinesis stream used to
>>>>> write to and then again read from is empty before starting the app*)
>>>>>
>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>> Pipeline pipeline = Pipeline.create(options);
>>>>>
>>>>> PCollection<> input = pipeline.apply(/* read from some source */);
>>>>>
>>>>> // populate an empty kinesis stream
>>>>> input
>>>>> .apply(
>>>>> KinesisIO.write()
>>>>> .withStreamName(streamName)
>>>>> // other IO configs ....
>>>>> );
>>>>>
>>>>> // within same application start another pipeline
>>>>> // to read from some kinesis stream from start
>>>>> PCollection<> output = pipeline
>>>>> .apply(
>>>>> KinesisIO.read()
>>>>> .withStreamName(streamName)
>>>>> .withMaxReadTime(duration) // wait for some duration before deciding
>>>>> to close the pipeline
>>>>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>>>>> from start
>>>>> // other IO configs
>>>>> )
>>>>> .apply(/* apply other transformations */);
>>>>>
>>>>>
>>>>> // write transformed output to same kinesis stream
>>>>> output
>>>>> .apply(
>>>>> KinesisIO.write()
>>>>> .withStreamName(streamName)
>>>>> // other IO configs
>>>>> );
>>>>>
>>>>> // also write transformed output to some other kinesis stream
>>>>> output
>>>>> .apply(
>>>>> KinesisIO.write()
>>>>> .withStreamName(otherStreamName) // a different kinesis stream
>>>>> // other IO configs
>>>>> );
>>>>>
>>>>>
>>>>> pipeline.run().waitUntilFinish();
>>>>>
>>>>>
>>>>> ---------------------------------------------------------------------------------------------------------------------------------------
>>>>>
>>>>> Will something like this work in a single beam application ?
>>>>> Is there a better way of designing this ?
>>>>>
>>>>> I am right now trying to run this using a direct runner but I am
>>>>> facing some issues in reading from the same kinesis stream again.
>>>>> It is actually able to read the records but somehow read records are
>>>>> not pushed downstream for further processing.
>>>>>
>>>>> Before debugging it further and looking into any logic issues or bugs
>>>>> in my code, I wanted to be sure if something like this is possible under
>>>>> beam constructs.
>>>>>
>>>>> Please let me know your thoughts.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>

Reply via email to