Hello!

I've never seen use-cases where it would be necessary. What are you trying
to achieve? Some context would be helpful.
Your example looks like you can split your app into two - one writes into
streamName and the others read from streamName.

P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is
not maintained anymore. Better to use this instead:
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html

Best Regards,
Pavel Solomin

Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
<https://www.linkedin.com/in/pavelsolomin>





On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I am using aws beam sdk1 to read from and write to a kinesis stream.
> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>
>
> My pipeline is something like this: (*note the kinesis stream used to
> write to and then again read from is empty before starting the app*)
>
> ---------------------------------------------------------------------------------------------------------------------------------------
> Pipeline pipeline = Pipeline.create(options);
>
> PCollection<> input = pipeline.apply(/* read from some source */);
>
> // populate an empty kinesis stream
> input
> .apply(
> KinesisIO.write()
> .withStreamName(streamName)
> // other IO configs ....
> );
>
> // within same application start another pipeline
> // to read from some kinesis stream from start
> PCollection<> output = pipeline
> .apply(
> KinesisIO.read()
> .withStreamName(streamName)
> .withMaxReadTime(duration) // wait for some duration before deciding to
> close the pipeline
> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
> from start
> // other IO configs
> )
> .apply(/* apply other transformations */);
>
>
> // write transformed output to same kinesis stream
> output
> .apply(
> KinesisIO.write()
> .withStreamName(streamName)
> // other IO configs
> );
>
> // also write transformed output to some other kinesis stream
> output
> .apply(
> KinesisIO.write()
> .withStreamName(otherStreamName) // a different kinesis stream
> // other IO configs
> );
>
>
> pipeline.run().waitUntilFinish();
>
>
> ---------------------------------------------------------------------------------------------------------------------------------------
>
> Will something like this work in a single beam application ?
> Is there a better way of designing this ?
>
> I am right now trying to run this using a direct runner but I am facing
> some issues in reading from the same kinesis stream again.
> It is actually able to read the records but somehow read records are not
> pushed downstream for further processing.
>
> Before debugging it further and looking into any logic issues or bugs in
> my code, I wanted to be sure if something like this is possible under beam
> constructs.
>
> Please let me know your thoughts.
>
> Thanks
> Sachin
>
>

Reply via email to