[ 
https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16864956#comment-16864956
 ] 

Brachi Packter commented on BEAM-7357:
--------------------------------------

I get same Same error:
 [0x00001728][0x00007f13ed4c4700] [error] [shard_map.cc:150] Shard map update 
for stream "**" failed. Code: LimitExceededException Message: Rate exceeded for 
stream poc-test under account **.; retrying in 5062 ms

I'm not seeing full stack trace, but can see in log also this:

[2019-06-13 08:29:09.427018] [0x000007e1][0x00007f8d508d3700] [warning] [AWS 
Log: WARN](AWSErrorMarshaller)Encountered AWSError Throttling Rate exceeded

 

More details:
I'm using DataFlow runner, java SDK 2.11.

60 workers initally, (with auto scalling and also with flag 
"enableStreamingEngine")

Normally, I'm producing 4-5k per second, but when I have latency, this can be 
even multiply by 3-4 times.

When I'm starting the DataFlow job I have latency, so I produce more data, and 
I fail immediately.

Also, I have consumers, 3rd party tool, I know that they call describe stream 
each 30 seconds.

My job pipeline, running on GCP, reading data from PubSub, it read around 
20,000 record per second (in regular time, and in latency time even 100,000 
records per second) , it does many aggregation and counting base on some 
diamnesions (Using Beam sql) , This is done for 1 minutes window slide, and 
wrting the result of aggregations to Kinesis stream.

My stream has 10 shards, and my partition key logic is generating UUid per each 
record: 

UUID.randomUUID().toString()

 

Hope this gave you some more context on my problem.

Another suggestion I have, can you try fix the issue as I suggest and provide 
me some specific version for testing? without merging it to master? (I would di 
it myself, but I had truobles building locally the hue repository of apache 
beam..)

 

 

> Kinesis IO.write throws LimitExceededException
> ----------------------------------------------
>
>                 Key: BEAM-7357
>                 URL: https://issues.apache.org/jira/browse/BEAM-7357
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-kinesis
>    Affects Versions: 2.11.0
>            Reporter: Brachi Packter
>            Assignee: Alexey Romanenko
>            Priority: Major
>             Fix For: 2.14.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> I used Kinesis IO to write to kinesis. I get very quickly many exceptions 
> like:
> [shard_map.cc:150] Shard map update for stream "***" failed. Code: 
> LimitExceededException Message: Rate exceeded for stream *** under account 
> ***; retrying in ..
> Also, I see many exceptions like:
> Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at 
> org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191)
>  at 
> org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515)
> I'm sure this stream exists because I can see some data from my pipeline that 
> was successfully ingested to it.
>  
> Here is my code:
>  
>  
> {code:java}
> .apply(KinesisIO.write()
>        .withStreamName("**")
>        .withPartitioner(new KinesisPartitioner() {
>                        @Override
>                         public String getPartitionKey(byte[] value) {
>                                         return UUID.randomUUID().toString()
>                          }
>                         @Override
>                         public String getExplicitHashKey(byte[] value) {
>                                         return null;
>                         }
>        })
>        .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code}
>  
> I tried to not use the Kinesis IO. and everything works well, I can't figure 
> out what went wrong.
> I tried using the same API as the library did.
>  
> {code:java}
> .apply(
>  ParDo.of(new DoFn<byte[], Void>() {
>  private transient IKinesisProducer inlineProducer;
>  @Setup
>  public void setup(){
>  KinesisProducerConfiguration config =   
> KinesisProducerConfiguration.fromProperties(new Properties());
>  config.setRegion(Regions.US_EAST_1.getName());
>  config.setCredentialsProvider(new AWSStaticCredentialsProvider(new 
> BasicAWSCredentials("***", "***")));
>  inlineProducer = new KinesisProducer(config);
>  }
>  @ProcessElement
>  public void processElement(ProcessContext c) throws Exception {
>     ByteBuffer data = ByteBuffer.wrap(c.element());
>     String partitionKey =UUID.randomUUID().toString();
>     ListenableFuture<UserRecordResult> f =
>     getProducer().addUserRecord("***", partitionKey, data);
>    Futures.addCallback(f, new UserRecordResultFutureCallback());
> }
>  class UserRecordResultFutureCallback implements 
> FutureCallback<UserRecordResult> {
>  @Override
>  public void onFailure(Throwable cause) {
>    throw new RuntimeException("failed produce:"+cause);
>  }
>  @Override
>  public void onSuccess(UserRecordResult result) {
>  }
>  }
>  })
>  );
>  
> {code}
>  
> Any idea what I did wrong? or what the error in the KinesisIO?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to