Difference between sdk.io.aws2.kinesis.KinesisIO vs sdk.io.kinesis.KinesisIO

2022-08-31 Thread Sachin Mittal
Hello folks,
We are consuming from a kinesis stream in our beam application.
So far we were using *sdk.io *.kinesis.KinesisIO to read
from kinesis.
What I understand is that this library does not use Enhanced Fan-Out
Consumer provided by AWS.

Recently I saw this library *sdk.io.aws2*.kinesis.KinesisIO and I wanted to
understand what is the purpose of this new KinesisIO to read from and write
to kinesis ?

Does this new library use Enhanced Fan-Out Consumer ?

When and how should we decide if we want to migrate to new library or
continue to use the current one ?

Thanks
Sachin


Can we shutdown a pipeline based on some condition

2023-05-04 Thread Sachin Mittal
Hi,
I am kind of building a batch/streaming hybrid beam application.
Data is fed into a kinesis stream and the beam pipeline is run.

I want to stop the pipeline if no new data is fed into the stream for a
certain period of time, say 5 minutes.

Is there a way of achieving this ?

Right now I only see something like this:

pipeline.run().waitUntilFinish();

Instead of waiting until finish can we have some conditional finish in any
way ?

Thanks
Sachin


How to identify what objects in your code have to be serialized

2023-05-08 Thread Sachin Mittal
I am trying to create a pipeline where I query paginated data from some
external service via a client and join them into a PCollectionList and
flatten it to get the final collection of data items.
The data class is encoded using a ProtoCoder

Here is my code:
-
private static PCollection loadAllData(
Pipeline pipeline, DataClient client, Instant from, Instant to) {

PaginatedList paginatedData = getPaginatedData(client, from, to, 0);
int total = paginatedData.meta().total();
int limit = paginatedData.meta().limit();

List> collections = new ArrayList<>();

PCollection collection =
pipeline
.apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
)));
collections.add(collection);

for (int i = 1; i < total / limit + 1; i++) {
paginatedData = getPaginatedData(client, from, to, i * limit);
collection =
pipeline
.apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
)));
observationsCollections.add(collection);
}
PCollectionList list = PCollectionList.of(collections);

return list
.apply(Flatten.pCollections())
.setCoder(ProtoCoder.of(Data.class));
}
---

When I run this pipeline it is complaining at each step like DataClient has
to be serializable.
Even any objects created in the above method have to be serializable, for
example PaginatedList.

However many of the classes I use like DataClient and PaginatedList are
part of some third party library and
they don't implement serializable. So is there any way to ensure beam is
able to serialize them ?

Overall in general when designing a pipeline how can we identify what all
objects would have to be serialized.

Instead of creating this static method, if I create a non-static method and
implement serializable in the base class which contains this method,
would this help ?

Thanks
Sachin


Re: How to identify what objects in your code have to be serialized

2023-05-08 Thread Sachin Mittal
lds that client as a member, by any chance?
>
> Please take a look here for a bit of context:
> https://beam.apache.org/documentation/programming-guide/#requirements-for-writing-user-code-for-beam-transforms
>
>
> Best,
> Bruno
>
>
>
>
>
>
>
>
>
> On Tue, May 9, 2023 at 12:30 AM Sachin Mittal  wrote:
>
>> I am trying to create a pipeline where I query paginated data from some
>> external service via a client and join them into a PCollectionList and
>> flatten it to get the final collection of data items.
>> The data class is encoded using a ProtoCoder
>>
>> Here is my code:
>>
>> -
>> private static PCollection loadAllData(
>> Pipeline pipeline, DataClient client, Instant from, Instant to) {
>>
>> PaginatedList paginatedData = getPaginatedData(client, from, to, 0
>> );
>> int total = paginatedData.meta().total();
>> int limit = paginatedData.meta().limit();
>>
>> List> collections = new ArrayList<>();
>>
>> PCollection collection =
>> pipeline
>> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
>> )));
>> collections.add(collection);
>>
>> for (int i = 1; i < total / limit + 1; i++) {
>> paginatedData = getPaginatedData(client, from, to, i * limit);
>> collection =
>> pipeline
>> .apply(Create.of(paginatedData.data()).withCoder(ProtoCoder.of(Data.class
>> )));
>> observationsCollections.add(collection);
>> }
>> PCollectionList list = PCollectionList.of(collections);
>>
>> return list
>> .apply(Flatten.pCollections())
>> .setCoder(ProtoCoder.of(Data.class));
>> }
>>
>> ---
>>
>> When I run this pipeline it is complaining at each step like DataClient
>> has to be serializable.
>> Even any objects created in the above method have to be serializable, for
>> example PaginatedList.
>>
>> However many of the classes I use like DataClient and PaginatedList are
>> part of some third party library and
>> they don't implement serializable. So is there any way to ensure beam is
>> able to serialize them ?
>>
>> Overall in general when designing a pipeline how can we identify what all
>> objects would have to be serialized.
>>
>> Instead of creating this static method, if I create a non-static method
>> and implement serializable in the base class which contains this method,
>> would this help ?
>>
>> Thanks
>> Sachin
>>
>>
>>


Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-10 Thread Sachin Mittal
Hi,
I am using aws beam sdk1 to read from and write to a kinesis stream.
*org.apache.beam.sdk.io.kinesis.KinesisIO*


My pipeline is something like this: (*note the kinesis stream used to write
to and then again read from is empty before starting the app*)
---
Pipeline pipeline = Pipeline.create(options);

PCollection<> input = pipeline.apply(/* read from some source */);

// populate an empty kinesis stream
input
.apply(
KinesisIO.write()
.withStreamName(streamName)
// other IO configs 
);

// within same application start another pipeline
// to read from some kinesis stream from start
PCollection<> output = pipeline
.apply(
KinesisIO.read()
.withStreamName(streamName)
.withMaxReadTime(duration) // wait for some duration before deciding to
close the pipeline
.withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) // from
start
// other IO configs
)
.apply(/* apply other transformations */);


// write transformed output to same kinesis stream
output
.apply(
KinesisIO.write()
.withStreamName(streamName)
// other IO configs
);

// also write transformed output to some other kinesis stream
output
.apply(
KinesisIO.write()
.withStreamName(otherStreamName) // a different kinesis stream
// other IO configs
);


pipeline.run().waitUntilFinish();

---

Will something like this work in a single beam application ?
Is there a better way of designing this ?

I am right now trying to run this using a direct runner but I am facing
some issues in reading from the same kinesis stream again.
It is actually able to read the records but somehow read records are not
pushed downstream for further processing.

Before debugging it further and looking into any logic issues or bugs in my
code, I wanted to be sure if something like this is possible under beam
constructs.

Please let me know your thoughts.

Thanks
Sachin


Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-10 Thread Sachin Mittal
Use case is something like this:
A source writes source data to kinesis and same is used to compute derived
data which is again written back to same stream so next level of derived
data can be computed from previous derived data and so on.

Would there be any issues from beam side to do the same within a single
pipeline?

When you say I have to split my app into two do you mean that I have to
create two pipeline objects in my application?

If so then how will application end?

Note that source is of finite size which gets written into kinesis.

Also we do plan to migrate to aws2 io, but later. If aws1 has some
limitations in achieving what we want then please let me know.

Thanks


On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin  wrote:

> Hello!
>
> I've never seen use-cases where it would be necessary. What are you trying
> to achieve? Some context would be helpful.
> Your example looks like you can split your app into two - one writes into
> streamName and the others read from streamName.
>
> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and is
> not maintained anymore. Better to use this instead:
> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 10 May 2023 at 10:50, Sachin Mittal  wrote:
>
>> Hi,
>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>
>>
>> My pipeline is something like this: (*note the kinesis stream used to
>> write to and then again read from is empty before starting the app*)
>>
>> ---
>> Pipeline pipeline = Pipeline.create(options);
>>
>> PCollection<> input = pipeline.apply(/* read from some source */);
>>
>> // populate an empty kinesis stream
>> input
>> .apply(
>> KinesisIO.write()
>> .withStreamName(streamName)
>> // other IO configs 
>> );
>>
>> // within same application start another pipeline
>> // to read from some kinesis stream from start
>> PCollection<> output = pipeline
>> .apply(
>> KinesisIO.read()
>> .withStreamName(streamName)
>> .withMaxReadTime(duration) // wait for some duration before deciding to
>> close the pipeline
>> .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) //
>> from start
>> // other IO configs
>> )
>> .apply(/* apply other transformations */);
>>
>>
>> // write transformed output to same kinesis stream
>> output
>> .apply(
>> KinesisIO.write()
>> .withStreamName(streamName)
>> // other IO configs
>> );
>>
>> // also write transformed output to some other kinesis stream
>> output
>> .apply(
>> KinesisIO.write()
>> .withStreamName(otherStreamName) // a different kinesis stream
>> // other IO configs
>> );
>>
>>
>> pipeline.run().waitUntilFinish();
>>
>>
>> ---
>>
>> Will something like this work in a single beam application ?
>> Is there a better way of designing this ?
>>
>> I am right now trying to run this using a direct runner but I am facing
>> some issues in reading from the same kinesis stream again.
>> It is actually able to read the records but somehow read records are not
>> pushed downstream for further processing.
>>
>> Before debugging it further and looking into any logic issues or bugs in
>> my code, I wanted to be sure if something like this is possible under beam
>> constructs.
>>
>> Please let me know your thoughts.
>>
>> Thanks
>> Sachin
>>
>>


Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Sachin Mittal
Hi,
I want to emit a bounded sequence of numbers from 0 to n but downstream to
receive this sequence at a given rate.

This is needed so that we can rate limit the HTTP request downstream.

Say if we generate sequence from 1 - 100 then downstream would make 100
such requests almost at the same time.

So to add gaps I am trying something like this.

Would a code like this work ?
pipeline
.apply(GenerateSequence.from(0).to(100).withRate(1, Duration.standardSeconds
(5)))
.apply(ParDo.of(new BatchDataLoad()))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);


Somehow this does not seem to be generating numbers at that rate which is 1
per 5 seconds but all at one time.
Also looks like it may be creating an unbounded collection and looks like
kinesis is not writing anything to the stream.

If not then is there a way to achieve this?

Thanks
Sachin


Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Sachin Mittal
Hi,
So I have prepared the write pipeline something like this:

--
writePipeline
.apply(GenerateSequence.from(0).to(100))
.apply(ParDo.of(new DoFn() {
@ProcessElement
public void processElement(ProcessContext c) {
long i = c.element();
// Fetching data for step=i
List<> data = fetchForInputStep(i);
// output all the data one by one
for (Data d : data) {
out.output(d.asBytes());
}
}
}))
.apply(KinesisIO.write()
.withStreamName(streamName)
// other configs
);

writePipeline.run().waitUntilFinish()

What I observe is that pipeline part to push data to kinesis is only
happening after the entire data is loaded by a second apply function.
So what happens is that 100,000's of data records are accumulated and they
are tried to be pushed to Kinesis all at once and we get following error:
*KPL Expiration reached while waiting in limiter*

The logs are generated like this:
--
Extracting binaries to
/var/folders/30/knyj9z4d3psbd4s6kffqc500gn/T/amazon-kinesis-producer-native-binaries
.
[main.cc:384] Starting up main producer
.
[main.cc:395] Entering join
.
Fetching data for step=1
.
Fetching data for step=100
.
[kinesis_producer.cc:200] Created pipeline for stream "xx"
[shard_map.cc:87] Updating shard map for stream "xx"
[shard_map.cc:148] Successfully updated shard map for stream "xx" found
1 shards
[processing_statistics_logger.cc:111] Stage 1 Triggers: { stream: 'xx',
manual: 10, count: 0, size: 4688, matches: 0, timed: 0, UserRecords:
742018, KinesisRecords: 4698 }


I had assumed that as soon as step 1 data was fetched it would pass the
data downstream and
the kinesis pipeline would have been created much before and would have
started writing to Kinesis much earlier, but this is happening only after
all the data is collected.

Is there a way to fix this ?

Thanks
Sachin



On Wed, May 10, 2023 at 4:29 PM Pavel Solomin  wrote:

> > two pipeline objects in my application
>
> I think this should work. I meant to have 2 separate artifacts and deploy
> them separately, but if your app runs batch processing with 2 sequential
> steps, 2 pipelines should work too:
>
> - writePipeline.run().waitUntilFinish()
> - readAndWritePipeline.run().waitUntilFinish()
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Wed, 10 May 2023 at 11:49, Sachin Mittal  wrote:
>
>> Use case is something like this:
>> A source writes source data to kinesis and same is used to compute
>> derived data which is again written back to same stream so next level of
>> derived data can be computed from previous derived data and so on.
>>
>> Would there be any issues from beam side to do the same within a single
>> pipeline?
>>
>> When you say I have to split my app into two do you mean that I have to
>> create two pipeline objects in my application?
>>
>> If so then how will application end?
>>
>> Note that source is of finite size which gets written into kinesis.
>>
>> Also we do plan to migrate to aws2 io, but later. If aws1 has some
>> limitations in achieving what we want then please let me know.
>>
>> Thanks
>>
>>
>> On Wed, 10 May 2023 at 3:32 PM, Pavel Solomin 
>> wrote:
>>
>>> Hello!
>>>
>>> I've never seen use-cases where it would be necessary. What are you
>>> trying to achieve? Some context would be helpful.
>>> Your example looks like you can split your app into two - one writes
>>> into streamName and the others read from streamName.
>>>
>>> P.S.: org.apache.beam.sdk.io.kinesis.KinesisIO is legacy connector and
>>> is not maintained anymore. Better to use this instead:
>>> https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/aws2/kinesis/KinesisIO.html
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 10:50, Sachin Mittal  wrote:
>>>
>>>> Hi,
>>>> I am using aws beam sdk1 to read from and write to a kinesis stream.
>>>> *org.apache.beam.sdk.io.kinesis.KinesisIO*
>>>>
>>>>
>>>> My pipeline is something like this: (*note the kinesis stream used to
>&

Re: Is there a way to generated bounded sequence emitted at a particular rate

2023-05-12 Thread Sachin Mittal
I am using a direct runner.

If I remove the
.withRate(1, Duration.standardSeconds(5)

Then Kinesis IO writes to Kinesis, however it receives all the input
records at once and then throws:
*KPL Expiration reached while waiting in limiter*

I suppose we have certain limitations with direct runner (which I am only
using for writing test cases).
Real example will run on flink runner.

Thanks
Sachin


On Fri, May 12, 2023 at 9:09 PM Pavel Solomin  wrote:

> Hello!
>
> > this does not seem to be generating numbers at that rate which is 1 per
> 5 seconds but all at one time
>
> What runner do you use? I've seen that behavior of GenerateSequence only
> in Direct runner.
>
> > Also looks like it may be creating an unbounded collection and looks
> like kinesis is not writing anything to the stream.
>
> Never seen that happening, and I used KinesisIO quite a lot recently in my
> playgrounds - in the same way you use, generating sequences and writing to
> Kinesis. Can you share a full reproducible example of stuck KinesisIO?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 12 May 2023 at 15:04, Sachin Mittal  wrote:
>
>> Hi,
>> I want to emit a bounded sequence of numbers from 0 to n but downstream
>> to receive this sequence at a given rate.
>>
>> This is needed so that we can rate limit the HTTP request downstream.
>>
>> Say if we generate sequence from 1 - 100 then downstream would make 100
>> such requests almost at the same time.
>>
>> So to add gaps I am trying something like this.
>>
>> Would a code like this work ?
>> pipeline
>> .apply(GenerateSequence.from(0).to(100).withRate(1, Duration.
>> standardSeconds(5)))
>> .apply(ParDo.of(new BatchDataLoad()))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>>
>> Somehow this does not seem to be generating numbers at that rate which is
>> 1 per 5 seconds but all at one time.
>> Also looks like it may be creating an unbounded collection and looks like
>> kinesis is not writing anything to the stream.
>>
>> If not then is there a way to achieve this?
>>
>> Thanks
>> Sachin
>>
>>


Re: Can we write to and read from and then write to same kinesis stream using KinesisIO

2023-05-12 Thread Sachin Mittal
Only direct runner.

I have right now disabled aggregation on kpl and it looks like to be
working.

On Sat, 13 May 2023 at 3:35 AM, Pavel Solomin  wrote:

> > 100,000's of data records are accumulated and they are tried to be
> pushed to Kinesis all at once
>
> Does that happen only in direct runner? Or Flink runner behaves similarly?
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 12 May 2023 at 16:43, Sachin Mittal  wrote:
>
>> Hi,
>> So I have prepared the write pipeline something like this:
>>
>>
>> --
>> writePipeline
>> .apply(GenerateSequence.from(0).to(100))
>> .apply(ParDo.of(new DoFn() {
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> long i = c.element();
>> // Fetching data for step=i
>> List<> data = fetchForInputStep(i);
>> // output all the data one by one
>> for (Data d : data) {
>> out.output(d.asBytes());
>> }
>> }
>> }))
>> .apply(KinesisIO.write()
>> .withStreamName(streamName)
>> // other configs
>> );
>>
>> writePipeline.run().waitUntilFinish()
>>
>> What I observe is that pipeline part to push data to kinesis is only
>> happening after the entire data is loaded by a second apply function.
>> So what happens is that 100,000's of data records are accumulated and
>> they are tried to be pushed to Kinesis all at once and we get following
>> error:
>> *KPL Expiration reached while waiting in limiter*
>>
>> The logs are generated like this:
>>
>> --
>> Extracting binaries to
>> /var/folders/30/knyj9z4d3psbd4s6kffqc500gn/T/amazon-kinesis-producer-native-binaries
>> .
>> [main.cc:384] Starting up main producer
>> .
>> [main.cc:395] Entering join
>> .
>> Fetching data for step=1
>> .
>> Fetching data for step=100
>> .
>> [kinesis_producer.cc:200] Created pipeline for stream "xx"
>> [shard_map.cc:87] Updating shard map for stream "xx"
>> [shard_map.cc:148] Successfully updated shard map for stream "xx"
>> found 1 shards
>> [processing_statistics_logger.cc:111] Stage 1 Triggers: { stream:
>> 'xx', manual: 10, count: 0, size: 4688, matches: 0, timed: 0,
>> UserRecords: 742018, KinesisRecords: 4698 }
>>
>>
>> I had assumed that as soon as step 1 data was fetched it would pass the
>> data downstream and
>> the kinesis pipeline would have been created much before and would have
>> started writing to Kinesis much earlier, but this is happening only after
>> all the data is collected.
>>
>> Is there a way to fix this ?
>>
>> Thanks
>> Sachin
>>
>>
>>
>> On Wed, May 10, 2023 at 4:29 PM Pavel Solomin 
>> wrote:
>>
>>> > two pipeline objects in my application
>>>
>>> I think this should work. I meant to have 2 separate artifacts and
>>> deploy them separately, but if your app runs batch processing with 2
>>> sequential steps, 2 pipelines should work too:
>>>
>>> - writePipeline.run().waitUntilFinish()
>>> - readAndWritePipeline.run().waitUntilFinish()
>>>
>>> Best Regards,
>>> Pavel Solomin
>>>
>>> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
>>> <https://www.linkedin.com/in/pavelsolomin>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, 10 May 2023 at 11:49, Sachin Mittal  wrote:
>>>
>>>> Use case is something like this:
>>>> A source writes source data to kinesis and same is used to compute
>>>> derived data which is again written back to same stream so next level of
>>>> derived data can be computed from previous derived data and so on.
>>>>
>>>> Would there be any issues from beam side to do the same within a single
>>>> pipeline?
>>>>
>>>> When you say I have to split my app into two do you mean that I have to
>>>> create two pipeline objects in my application?
>>>>
>>>> If so then how will application end?
>>>>
>>>> Note that source is of finite size which gets written into kinesis.
>>>>
>

Re: Can we shutdown a pipeline based on some condition

2023-05-13 Thread Sachin Mittal
Hi Pavel,
I have tried the setting you have suggested but this does not seem to work.
Looks like this setting bounds the unbounded stream till the specified time
and then closes the stream and it is only after that the records are
applied downstream.

In my case withMaxReadTime = 3 minutes

Logs are like this:

13:06:38.267 [direct-runner-worker] INFO a.p.f.b.c.k.ShardReadersPool -
Starting to read test-batch stream from [shardId-] shards
13:09:39.002 [direct-runner-worker] INFO a.p.f.b.c.k.ShardReadersPool -
Closing shard iterators pool
13:09:42.632 [pool-16-thread-1] INFO a.p.f.b.c.k.ShardReadersPool - Kinesis
Shard read loop has finished


As you can see, it reads all the records and then closes the reader and
only when the read loop is finished, the downstream operators process these
records further.

So if downstream I again right to the same Kinesis stream, it is never read
as the loop is closed.

I guess I may still need the unbounded stream so it continues to listen to
any fresh records added to the stream, but I would like the beam
application to shutdown after a specified time.

Would some like this work:

pipeline.run().waitUntilFinish(Duration.standardMinutes(3));

This will run the pipeline for 3 minutes and then shuts it down and by
then it has computed all the derived data.


Thanks

Sachin



On Thu, May 4, 2023 at 11:26 PM Pavel Solomin  wrote:

> Hello!
>
> In case of KinesisIO there is a param which you can set - withMaxReadTime
> I think many IOs implement it. It does not check if there is still some
> outstanding data, simply finishes the pipeline when this time is over.
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Thu, 4 May 2023 at 18:20, Sachin Mittal  wrote:
>
>> Hi,
>> I am kind of building a batch/streaming hybrid beam application.
>> Data is fed into a kinesis stream and the beam pipeline is run.
>>
>> I want to stop the pipeline if no new data is fed into the stream for a
>> certain period of time, say 5 minutes.
>>
>> Is there a way of achieving this ?
>>
>> Right now I only see something like this:
>>
>> pipeline.run().waitUntilFinish();
>>
>> Instead of waiting until finish can we have some conditional finish in
>> any way ?
>>
>> Thanks
>> Sachin
>>
>


Re: Difference between sdk.io.aws2.kinesis.KinesisIO vs sdk.io.kinesis.KinesisIO

2023-05-22 Thread Sachin Mittal
Hi,
Can anyone tell me if enhanced fanout consumer for kinesis is released or
not.
If I see this PR
https://github.com/apache/beam/pull/23540 looks like this feature is merged.
I also see this issue now closed
https://github.com/apache/beam/issues/19967

However I am not able to find the mention of the same in release notes:
https://github.com/apache/beam/releases/tag/v2.47.0

Any knowledge in this would be appreciated.

Thanks
Sachin


On Mon, Sep 5, 2022 at 12:35 PM Moritz Mack  wrote:

> Hi Sachin,
>
>
>
> I’d recommend migrating to the new AWS 2 IOs in
> beam-sdks-java-io-amazon-web-services2 (using Amazon’s Java SDK v2) rather
> soon.
>
> The previous ones (beam-sdks-java-io-amazon-web-services and
> beam-sdks-java-io-kinesis) are both deprecated and not actively maintained
> anymore.
>
>
>
> Please have a look at these notes in the changelog:
>
> https://github.com/apache/beam/blob/master/CHANGES.md#2410---2022-08-23
>
> https://github.com/apache/beam/blob/master/CHANGES.md#2380---2022-04-20
>
>
>
> Currently, neither one supports enhanced fan-out consumers yet.  It’s
> certainly something we’d like to support for the new v2 modules, but I
> personally didn’t have time to start looking into it so far.
>
> https://github.com/apache/beam/issues/19967
>
>
>
> Don’t hesitate to reach out in case you are facing any issues when
> migrating!
>
>
>
> Kind regards,
>
> Moritz
>
>
>
>
>
>
>
> On 31.08.22, 10:05, "Sachin Mittal"  wrote:
>
>
>
> Hello folks, We are consuming from a kinesis stream in our beam
> application. So far we were using sdk. io. kinesis. KinesisIO to read from
> kinesis. What I understand is that this library does not use Enhanced
> Fan-Out Consumer provided by AWS.
>
> Hello folks,
>
> We are consuming from a kinesis stream in our beam application.
>
> So far we were using *sdk.io
> <https://urldefense.com/v3/__http:/sdk.io__;!!CiXD_PY!VUENr73r_r9hNui_jnBewR2JWX7t2IwDXe6_FLFUq43GIFb3-uSIOC9DWmtR3XKaJzp3O-sg6N0iBA$>*.kinesis.KinesisIO
> to read from kinesis.
>
> What I understand is that this library does not use Enhanced Fan-Out
> Consumer provided by AWS.
>
>
>
> Recently I saw this library *sdk.io.aws2*.kinesis.KinesisIO and I
> wanted to understand what is the purpose of this new KinesisIO to read from
> and write to kinesis ?
>
>
>
> Does this new library use Enhanced Fan-Out Consumer ?
>
>
>
> When and how should we decide if we want to migrate to new library or
> continue to use the current one ?
>
>
>
> Thanks
>
> Sachin
>
>
>
>
>


Can anyone tell me difference between these transformations

2023-06-09 Thread Sachin Mittal
1.

Window.>into(FixedWindows.of(windowSize))
.triggering(Never.ever())
.withAllowedLateness(allowedLateness, Window.ClosingBehavior.FIRE_ALWAYS)
.discardingFiredPanes()


2.

Window.>into(FixedWindows.of(windowSize))
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
.withAllowedLateness(allowedLateness)
.discardingFiredPanes()


If both are identical, which one should be preferred?

Thanks
Sachin


EFO KinesisIO watermarking doubt

2023-07-20 Thread Sachin Mittal
Hi,
We are implementing EFO Kinesis IO reader provided by apache beam.
I see that in code that for implementation of getCurrentTimestamp we always
return getApproximateArrivalTimestamp and not the event time which we may
have set for that record using withCustomWatermarkPolicy.

Please refer:
https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91

However for KafkaIO we do something different:
We always get the getCurrentTimestamp based on `timestampPolicy` set for
Kafka where user can emit a custom timestamp associated with each record.

Please refer:
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210

So why is there a difference in these two implementations?

We wanted the current timestamp based on some custom time embedded within
the record and not approximate arrival time and not sure how we can achieve
that.

Please let us know if there is a way out to achieve this for Kinesis.

Thanks
Sachin


Can we use RedisIO to write records from an unbounded collection

2023-07-20 Thread Sachin Mittal
Hi,
I was planning to use the RedisIO write/writeStreams function in a
streaming pipeline.

https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/redis/RedisIO.html

The pipeline would read an unbounded collection from Kinesis and update
redis.
It will update data for which key exists and add new key/value where it
does not exist.

Please let me know if this IO is suitable for this purpose.

I was reading up on this IO here
https://beam.apache.org/documentation/io/connectors/ and it states that it
only supports batch and not streaming.

If this works then what is a better option to use write or writeStreams
function ?

Thanks
Sachin


How can we get multiple side inputs from a single pipeline ?

2023-08-28 Thread Sachin Mittal
Hi,

I was checking the code for side input patterns :

https://beam.apache.org/documentation/patterns/side-inputs/
Basically I need multiple side inputs from a  Slowly updating global window
side inputs.

So as per example pipeline is something like this:

PCollectionView map =
p.apply(GenerateSequence.from(0).withRate(1,
Duration.standardSeconds(5L))).apply(
ParDo.of(new DoFn>() {
   @ProcessElement  public void
process(@Element Long input, @Timestamp Instant timestamp,
OutputReceiver> o) {
o.output(/* output a map */);// also output
another map and a list, is this possible ?  }
  })).apply(
Window.>into(new GlobalWindows())
  .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
   .discardingFiredPanes())
.apply(Latest.globally()).apply(View.asSingleton());


So as an extension of this example from the same DoFn which fetches the
side input, alongside the map, I may also need another Map and another List.
Reason I need to perform this in the same DoFn is that from this function
we query external sources to get the side input and the other side inputs
are also built from the same source.

So I would like to avoid querying external sources multiple times to
generate multiple side inputs from different DoFn and want to use the same
function to generate multiple side inputs.

 Can I achieve this by using  "Tags for multiple outputs" ?

Thanks
Sachin


Re: Issue with growing state/checkpoint size

2023-08-29 Thread Sachin Mittal
Yes even we faced the same issue when trying to run a pipeline involving
join of two collections. It was deployed using AWS KDA, which uses flink
runner. The source was kinesis streams.

Looks like join operations are not very efficient in terms of size
management when run on flink.

We had to rewrite our pipeline to avoid these joins.

Thanks
Sachin


On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas 
wrote:

> Hello
>
> I experimenting an issue with my beam pipeline
>
> I have a pipeline in which I split the work into different branches, then
> I do a join using CoGroupByKey, each message has its own unique Key.
>
> For the Join, I used a Session Window, and discarding the messages after
> trigger.
>
> I'm using Flink Runner and deployed a KInesis application. But I'm
> experiencing  an unbounded growth of the checkpoint data size. When I see
> in Flink console, the  following task has the largest checkpoint
>
> join_results/GBK -> ToGBKResult ->
> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>
>
> Any Advice ?
>
> Thank you very much!
>
>


Re: Issue with growing state/checkpoint size

2023-08-29 Thread Sachin Mittal
So for the smaller size of collection which does not grow with size for
certain keys we stored the data in redis and instead of beam join in our
DoFn we just did the lookup and got the data we need.


On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas 
wrote:

> Hello,
>
> Thanks for the reply, Any strategy you followed to avoid joins when you
> rewrite your pipeline?
>
>
>
> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal  wrote:
>
>> Yes even we faced the same issue when trying to run a pipeline involving
>> join of two collections. It was deployed using AWS KDA, which uses flink
>> runner. The source was kinesis streams.
>>
>> Looks like join operations are not very efficient in terms of size
>> management when run on flink.
>>
>> We had to rewrite our pipeline to avoid these joins.
>>
>> Thanks
>> Sachin
>>
>>
>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas 
>> wrote:
>>
>>> Hello
>>>
>>> I experimenting an issue with my beam pipeline
>>>
>>> I have a pipeline in which I split the work into different branches,
>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>
>>> For the Join, I used a Session Window, and discarding the messages after
>>> trigger.
>>>
>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>> in Flink console, the  following task has the largest checkpoint
>>>
>>> join_results/GBK -> ToGBKResult ->
>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>
>>>
>>> Any Advice ?
>>>
>>> Thank you very much!
>>>
>>>


Re: Issue with growing state/checkpoint size

2023-09-01 Thread Sachin Mittal
Yes a very high and non deterministic cardinality can make the stored state
of join operation unbounded.
In my case we know the cardinality and it was not very high so we could go
with a lookup based approach using redis to enrich the stream and avoid
joins.



On Wed, Aug 30, 2023 at 5:04 AM Ruben Vargas 
wrote:

> Thanks for the reply and the advice
>
> One more thing, Do you know if the key-space carnality impacts on this?
> I'm assuming it is, but the thing is for my case all the messages from the
> sources has a unique ID, that makes my key-space huge and is not on my
> control .
>
> On Tue, Aug 29, 2023 at 9:29 AM Sachin Mittal  wrote:
>
>> So for the smaller size of collection which does not grow with size for
>> certain keys we stored the data in redis and instead of beam join in our
>> DoFn we just did the lookup and got the data we need.
>>
>>
>> On Tue, 29 Aug 2023 at 8:50 PM, Ruben Vargas 
>> wrote:
>>
>>> Hello,
>>>
>>> Thanks for the reply, Any strategy you followed to avoid joins when you
>>> rewrite your pipeline?
>>>
>>>
>>>
>>> On Tue, Aug 29, 2023 at 9:15 AM Sachin Mittal 
>>> wrote:
>>>
>>>> Yes even we faced the same issue when trying to run a pipeline
>>>> involving join of two collections. It was deployed using AWS KDA, which
>>>> uses flink runner. The source was kinesis streams.
>>>>
>>>> Looks like join operations are not very efficient in terms of size
>>>> management when run on flink.
>>>>
>>>> We had to rewrite our pipeline to avoid these joins.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>
>>>> On Tue, 29 Aug 2023 at 7:00 PM, Ruben Vargas 
>>>> wrote:
>>>>
>>>>> Hello
>>>>>
>>>>> I experimenting an issue with my beam pipeline
>>>>>
>>>>> I have a pipeline in which I split the work into different branches,
>>>>> then I do a join using CoGroupByKey, each message has its own unique Key.
>>>>>
>>>>> For the Join, I used a Session Window, and discarding the messages
>>>>> after trigger.
>>>>>
>>>>> I'm using Flink Runner and deployed a KInesis application. But I'm
>>>>> experiencing  an unbounded growth of the checkpoint data size. When I see
>>>>> in Flink console, the  following task has the largest checkpoint
>>>>>
>>>>> join_results/GBK -> ToGBKResult ->
>>>>> join_results/ConstructCoGbkResultFn/ParMultiDo(ConstructCoGbkResult) -> V
>>>>>
>>>>>
>>>>> Any Advice ?
>>>>>
>>>>> Thank you very much!
>>>>>
>>>>>


Re: EFO KinesisIO watermarking doubt

2023-10-01 Thread Sachin Mittal
Hi,
I have filed an issue: https://github.com/apache/beam/issues/28760
I have also created a PR (based of our local fix for this):
https://github.com/apache/beam/pull/28763
This can serve as a start.

Thanks
Sachin


On Mon, Oct 2, 2023 at 2:54 AM Pavel Solomin  wrote:

> Hello, sorry for the late reply.
>
> EFOKinesisReader implemented the same logic of timestamps non-EFO
> KinesisReader had. At the time of EFO implementation more careful
> evaluation of the records' timestamps was out of context.
>
> Can you please create an issue at https://github.com/apache/beam/issues ?
> With an issue we can track this investigation which may become a new PR or
> some clarifications in the IO documentation.
>
> > We wanted the current timestamp based on some custom time embedded
> within the record and not approximate arrival time and not sure how we can
> achieve that.
>
> KinesisIO outputs only byte[] of a message payload without any decoding.
> If your timestamps sit in the messages' payload, I think, this approach
> should work:
> https://beam.apache.org/documentation/programming-guide/#adding-timestamps-to-a-pcollections-elements
>
> Best Regards,
> Pavel Solomin
>
> Tel: +351 962 950 692 | Skype: pavel_solomin | Linkedin
> <https://www.linkedin.com/in/pavelsolomin>
>
>
>
>
>
> On Fri, 21 Jul 2023 at 07:19, Sachin Mittal  wrote:
>
>> Hi,
>> We are implementing EFO Kinesis IO reader provided by apache beam.
>> I see that in code that for implementation of getCurrentTimestamp we
>> always return getApproximateArrivalTimestamp and not the event time
>> which we may have set for that record using withCustomWatermarkPolicy.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/EFOKinesisReader.java#L91
>>
>> However for KafkaIO we do something different:
>> We always get the getCurrentTimestamp based on `timestampPolicy` set for
>> Kafka where user can emit a custom timestamp associated with each record.
>>
>> Please refer:
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L210
>>
>> So why is there a difference in these two implementations?
>>
>> We wanted the current timestamp based on some custom time embedded within
>> the record and not approximate arrival time and not sure how we can achieve
>> that.
>>
>> Please let us know if there is a way out to achieve this for Kinesis.
>>
>> Thanks
>> Sachin
>>
>>


Re: implementing cool down?

2023-10-19 Thread Sachin Mittal
You can try approach mentioned in here:
https://beam.apache.org/blog/timely-processing/


On Thu, Oct 19, 2023 at 12:36 PM Balogh, György  wrote:

> Hi,
> I'm  using beam from java. My pipeline generates alerts.
> I'd like to filter alerts with a cool down period: if an alert is emitted,
> disable new alerts for a given cool down period (eg: 1 minute).
> What would be the best approach to implement this?
> Gyorgy
>
> --
>
> György Balogh
> CTO
> E gyorgy.bal...@ultinous.com 
> M +36 30 270 8342 <+36%2030%20270%208342>
> A HU, 1117 Budapest, Budafoki út 209.
> W www.ultinous.com
>


Re: [Question] Does SnowflakeIO connector support for AWS and Azure?

2023-10-24 Thread Sachin Mittal
I think AWS is supported and I was able to configure snowflake io with S3
buckets.



On Wed, 25 Oct 2023 at 9:05 AM, mybeam  wrote:

> Hello,
>
> As per the javadoc below, only the GCP bucket is supported currently by
> SnowflakeIO connector? Can you please confirm that AWS and Azure are
> supported now? Thanks.
>
>
> https://beam.apache.org/releases/javadoc/2.50.0/index.html?org/apache/beam/sdk/io/snowflake/SnowflakeIO.html
> ,
>


Re: Pipeline Stalls at GroupByKey Step

2023-11-16 Thread Sachin Mittal
Do you add time stamp to every record you output in
ConvertFromKafkaRecord step or any step before that.

On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov  wrote:

> Hi,
>
> In our pipeline, we've encountered an issue with the GroupByKey step.
> After some time of running, it seems that messages are not progressing
> through the GroupByKey step, causing the pipeline to stall in data
> processing.
> To troubleshoot this issue, we added debug logging before and after the
> GroupByKey step.
>
> We are using Beam version 2.50 with Flink 1.16.
>
> running with only 1 task manager, 2 slots. parallelism 2. no HA.
>
> any insights or suggestions?
> The messages are KV - of String and an Avro message.
>
> PCollection> ipSessionInput = pipeline
> .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
> pipelineUtil.getBootstrapServers(),
> options.getSourceKafkaTopic(),
> PIPELINE_NAME,
> IpSessionInfo.class,
> IpSessionInfoDeserializer.class))
>
>  .apply("ConvertIpSessionFromKafka", ParDo.of(new
> ConvertFromKafkaRecord<>()))
>
>  .apply(Window.>into(new GlobalWindows())
> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
> .withAllowedLateness(Duration.ZERO)
> .discardingFiredPanes())
>
> .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn IpSessionInfo>, KV>() {
> @DoFn.ProcessElement
> public void processElement(ProcessContext c) {
> KV element = c.element();
> log.atInfo().log("Before GroupByKey: " + element);
> c.output(element);
> }
> }))
>
> *.apply(GroupByKey.create())*
>  .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn Iterable>, KV>>() {
> @ProcessElement
> public void processElement(ProcessContext c) {
> KV> groupedElement =
> c.element();
> log.atInfo().log("After GroupByKey: " + groupedElement);
> c.output(groupedElement);
> }
> }))
> .apply(ParDo.of(new ConvertIpSession()));
>
>
> thanks
> Sigalit
>


Re: [Question] Does SnowflakeIO connector support for AWS and Azure?

2023-12-02 Thread Sachin Mittal
Hi,
What are you trying to do ?
Write to snowflake or read from it ?

Thanks
Sachin


On Fri, Dec 1, 2023 at 10:43 AM Xinmin  wrote:

> Hello Sachin,
>
> You said that you were able to configure SnowflakeIO with S3 bucket. Can
> you please share with me the steps to configure and test it? I would really
> appreciate it. Thanks.
>
>
> Regards,
> Xinmin
>
>
> On Thu, Oct 26, 2023 at 9:42 AM mybeam  wrote:
>
>>  Hi Sachin,
>>
>> Thanks for your information.
>>
>> On Tue, Oct 24, 2023 at 11:02 PM Sachin Mittal 
>> wrote:
>>
>>> I think AWS is supported and I was able to configure snowflake io with
>>> S3 buckets.
>>>
>>>
>>>
>>> On Wed, 25 Oct 2023 at 9:05 AM, mybeam  wrote:
>>>
>>>> Hello,
>>>>
>>>> As per the javadoc below, only the GCP bucket is supported currently by
>>>> SnowflakeIO connector? Can you please confirm that AWS and Azure are
>>>> supported now? Thanks.
>>>>
>>>>
>>>> https://beam.apache.org/releases/javadoc/2.50.0/index.html?org/apache/beam/sdk/io/snowflake/SnowflakeIO.html
>>>> ,
>>>>
>>>


Re: [Question] Does SnowflakeIO connector support for AWS and Azure?

2023-12-04 Thread Sachin Mittal
I think along with the storage integration name you also have to specify
the snowpipe to use.

 SnowflakeIO.<>write()
.withDataSourceConfiguration(dc)
.to(TABLE_NAME)
.withStagingBucketName(stagingBucketName)
.withStorageIntegrationName(storageIntegrationName)
.withDebugMode(StreamingLogLevel.INFO) // change this to
get more information
.withSnowPipe("pipe")

However looking at your error I think there may be some other issue. It is
mainly related to how you are authenticating for S3 and looks like what
authenticator beam provides does not match with what is required by your
aws account.

What I can suggest is to just write to S3 using beam i/o.

Then configure the snowpipe to read from that S3 bucket into a snowflake
which will work independently from your beam application.

In my case I have almost always just written to S3 and then used snowpipe
to move data from S3 to snowflake and there is a standard way of doing that.

Thanks
Sachin


On Sun, Dec 3, 2023 at 11:33 AM Xinmin  wrote:

> Hello Sachin,
>
> I am trying to write data to Snowflake using SnowflakeIO Connector. As per
> https://docs.snowflake.com/en/user-guide/data-load-s3-config, there are
> three options to configure the access to S3 buckets in Snowflake. 1)
> Storage integration which avoids the need to provide access key and secret;
> 2) IAM role; 3) IAM user which needs us to provide the access key and
> secret. The option of IAM role is deprecated by Snowflake.
>
> As per https://beam.apache.org/documentation/io/built-in/snowflake/, it
> mentions the usage of storage integration.
>
> [image: image.png]
>
> In the meantime, it also mentions that we need to provide both keys, which
> should be related to the option of IAM user.
>
> [image: image.png]
>
> I googled the usage of SnowflakeIO connector and S3 bucket and then found
> examples using Python with both keys (access key and secret key).
>
> https://github.com/dennisbachmann/beam-s3-reader/tree/master
>
> https://python.plainenglish.io/using-apache-beam-and-aws-s3-storage-i-o-transforms-in-python-6cabe2a8d592
>
> https://medium.com/@varunkrishna97/apache-beam-reading-from-writing-to-s3-in-python-sdk-6f2ff6419a14
>
> Since we are not allowed to create an IAM user in AWS due to security
> policy, we would like to go with the option of storage integration, I did
> the test with that and got the error below. Not sure if it was caused by
> the environment or something else.
>
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.io.IOException: com.amazonaws.services.s3.model.AmazonS3Exception: The
> provided token has expired. (Service: Amazon S3; Status Code: 400; Error
> Code: ExpiredToken; Request ID: 1RCH191011YK9NEJ; S3 Extended Request ID:
> foh3wogVMGT7+kQ+5sbnjPf7WA+9Vbz6zfwmnz8GDChNgkPhf0Wzk9MRBBNl9KqsjHDDB/Jt1r4=;
> Proxy: null), S3 Extended Request ID:
> foh3wogVMGT7+kQ+5sbnjPf7WA+9Vbz6zfwmnz8GDChNgkPhf0Wzk9MRBBNl9KqsjHDDB/Jt1r4=
>
> I would appreciate it if you could share with me the steps that you
> followed to do the test using the SnowflakeIO connector and S3 bucket.
> Thanks.
>
> Regards,
> Xinmin
>
>
>
>
> On Sat, Dec 2, 2023 at 11:46 AM Sachin Mittal  wrote:
>
>> Hi,
>> What are you trying to do ?
>> Write to snowflake or read from it ?
>>
>> Thanks
>> Sachin
>>
>>
>> On Fri, Dec 1, 2023 at 10:43 AM Xinmin  wrote:
>>
>>> Hello Sachin,
>>>
>>> You said that you were able to configure SnowflakeIO with S3 bucket. Can
>>> you please share with me the steps to configure and test it? I would really
>>> appreciate it. Thanks.
>>>
>>>
>>> Regards,
>>> Xinmin
>>>
>>>
>>> On Thu, Oct 26, 2023 at 9:42 AM mybeam  wrote:
>>>
>>>>  Hi Sachin,
>>>>
>>>> Thanks for your information.
>>>>
>>>> On Tue, Oct 24, 2023 at 11:02 PM Sachin Mittal 
>>>> wrote:
>>>>
>>>>> I think AWS is supported and I was able to configure snowflake io with
>>>>> S3 buckets.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, 25 Oct 2023 at 9:05 AM, mybeam  wrote:
>>>>>
>>>>>> Hello,
>>>>>>
>>>>>> As per the javadoc below, only the GCP bucket is supported currently
>>>>>> by SnowflakeIO connector? Can you please confirm that AWS and Azure are
>>>>>> supported now? Thanks.
>>>>>>
>>>>>>
>>>>>> https://beam.apache.org/releases/javadoc/2.50.0/index.html?org/apache/beam/sdk/io/snowflake/SnowflakeIO.html
>>>>>> ,
>>>>>>
>>>>>


Re: [External Sender] Re: [Question] Apache Beam Pipeline AWS Credentials

2024-01-10 Thread Sachin Mittal
Hi,
Check this class org.apache.beam.sdk.io.aws2.options.AwsOptions
It tells you how to set AwsCredentialsProvider used to configure AWS
service clients.

For example you can run beam application with following application
properties:

 --awsCredentialsProvider={"@type": "StaticCredentialsProvider",
 "awsAccessKeyId": "key_id_value","awsSecretKey": "secret_value",
 "sessionToken": "token_value"  }

Thanks
Sachin



On Wed, Jan 10, 2024 at 11:30 PM Ramya Prasad via user 
wrote:

> Hi Alexey,
> Thanks for the quick response! Apologies for using the dev list, I was
> unsure which list to be using! So I did try to create my own
> AwsCredentialsProvider class, but I'm running into a Serializable issue, as
> the error I'm getting is "Failed to serialize PipelineOptions". This is
> roughly what my class looks like, any suggestions?
>
> import software.amazon.awssdk.auth.credentials.AwsCredentials;
> import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
> import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
>
> import java.io.Serializable;
> import java.time.Instant;
>
> public class CustomAwsCredentialsProvider implements Serializable, 
> AwsCredentialsProvider {
> private AwsSessionCredentials creds;
>
> public CustomAwsCredentialsProvider(AwsSessionCredentials creds) {
> this.creds = creds;
> }
>
> @Override
> public AwsCredentials resolveCredentials() {
>
> if (this.creds.expirationTime().isPresent()) {
> Instant exp = this.creds.expirationTime().get();
> if (exp.isBefore(Instant.now())) {
> 
> }
> }
> return this.creds;
> }
> }
>
> Thanks and sincerely,
> Ramya
> On Wed, Jan 10, 2024 at 9:00 AM Alexey Romanenko 
> wrote:
>
>> Hi Ramya,
>>
>> I don’t think there is a solution out-of-the-box for such case but I
>> believe you can create your own AwsCredentialsProvider and implement the
>> logic to fetch the credentials dynamically.
>>
>> PS: Please, use only user@beam.apache.org mailing list for such type of
>> questions.
>>
>> —
>> Alexey
>>
>> On 9 Jan 2024, at 19:39, Ramya Prasad via user 
>> wrote:
>>
>> Hello,
>>
>> I am a developer trying to use Apache Beam in Java, and I am having an
>> issue with my AWS credentials expiring before my pipeline is done
>> executing. I'm limited by how I get my AWS credentials, as I set the
>> credentials using the StaticCredentialsProvider class. I set the
>> credentials (which is an access key, secret access key, and session token)
>> in the PipelineOptions object before I create the pipeline. However, I
>> believe the PipelineOptions object cannot be modified during runtime. I'm
>> not sure how to refresh my credentials so the pipeline doesn't break during
>> execution.
>>
>> Some code for how I set my credentials for reference:
>>
>> Credentials awsCreds = ;
>> AwsCredentials credentials = 
>> AwsSessionCredentials.create(awsCreds.getAccessKeyId(), 
>> awsCreds.getSecretAccessKey(), awsCreds.getSessionToken());
>> AwsCredentialsProvider provider = 
>> StaticCredentialsProvider.create(credentials);
>>
>> pipelineOptions.as(AwsOptions.class).setAwsCredentialsProvider(provider);
>>
>>
>>
>> Any help would be appreciated!
>>
>> Thanks and sincerely,
>> Ramya
>> --
>>
>>
>> The information contained in this e-mail may be confidential and/or
>> proprietary to Capital One and/or its affiliates and may only be used
>> solely in performance of work or services for Capital One. The information
>> transmitted herewith is intended only for use by the individual or entity
>> to which it is addressed. If the reader of this message is not the intended
>> recipient, you are hereby notified that any review, retransmission,
>> dissemination, distribution, copying or other use of, or taking of any
>> action in reliance upon this information is strictly prohibited. If you
>> have received this communication in error, please contact the sender and
>> delete the material from your computer.
>>
>>
>>
>> --
>
> The information contained in this e-mail may be confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>
>
>
>
>