Hello! I've never seen use-cases where it would be necessary. What are you trying to achieve? Some context would be helpful. Your example looks like you can split your app into two - one writes into streamName and the others read from streamName.
P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is not maintained anymore. Better to use this instead: https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html Best Regards, Pavel Solomin Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin <https://www.linkedin.com/in/pavelsolomin> On Wed, 10 May 2023 at 10:50, Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I am using aws beam sdk1 to read from and write to a kinesis stream. > *org.apache.beam.sdk.io.kinesis.KinesisIO* > > > My pipeline is something like this: (*note the kinesis stream used to > write to and then again read from is empty before starting the app*) > > --------------------------------------------------------------------------------------------------------------------------------------- > Pipeline pipeline = Pipeline.create(options); > > PCollection<> input = pipeline.apply(/* read from some source */); > > // populate an empty kinesis stream > input > .apply( > KinesisIO.write() > .withStreamName(streamName) > // other IO configs .... > ); > > // within same application start another pipeline > // to read from some kinesis stream from start > PCollection<> output = pipeline > .apply( > KinesisIO.read() > .withStreamName(streamName) > .withMaxReadTime(duration) // wait for some duration before deciding to > close the pipeline > .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // > from start > // other IO configs > ) > .apply(/* apply other transformations */); > > > // write transformed output to same kinesis stream > output > .apply( > KinesisIO.write() > .withStreamName(streamName) > // other IO configs > ); > > // also write transformed output to some other kinesis stream > output > .apply( > KinesisIO.write() > .withStreamName(otherStreamName) // a different kinesis stream > // other IO configs > ); > > > pipeline.run().waitUntilFinish(); > > > --------------------------------------------------------------------------------------------------------------------------------------- > > Will something like this work in a single beam application ? > Is there a better way of designing this ? > > I am right now trying to run this using a direct runner but I am facing > some issues in reading from the same kinesis stream again. > It is actually able to read the records but somehow read records are not > pushed downstream for further processing. > > Before debugging it further and looking into any logic issues or bugs in > my code, I wanted to be sure if something like this is possible under beam > constructs. > > Please let me know your thoughts. > > Thanks > Sachin > >