[ https://issues.apache.org/jira/browse/BEAM-7357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16854431#comment-16854431 ]
Alexey Romanenko commented on BEAM-7357: ---------------------------------------- [~brachi_packter] Thank you for testing. Any chance that you haven't used the last version of {{2.14.0-SNAPSHOT}} from this [snapshot repository|https://repository.apache.org/content/repositories/snapshots/] ? What is exactly the version of shapshot jar did you try? > Kinesis IO.write throws LimitExceededException > ---------------------------------------------- > > Key: BEAM-7357 > URL: https://issues.apache.org/jira/browse/BEAM-7357 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis > Affects Versions: 2.11.0 > Reporter: Brachi Packter > Assignee: Alexey Romanenko > Priority: Major > Fix For: 2.14.0 > > Time Spent: 0.5h > Remaining Estimate: 0h > > I used Kinesis IO to write to kinesis. I get very quickly many exceptions > like: > [shard_map.cc:150] Shard map update for stream "***" failed. Code: > LimitExceededException Message: Rate exceeded for stream *** under account > ***; retrying in .. > Also, I see many exceptions like: > Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at > org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191) > at > org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515) > I'm sure this stream exists because I can see some data from my pipeline that > was successfully ingested to it. > > Here is my code: > > > {code:java} > .apply(KinesisIO.write() > .withStreamName("**") > .withPartitioner(new KinesisPartitioner() { > @Override > public String getPartitionKey(byte[] value) { > return UUID.randomUUID().toString() > } > @Override > public String getExplicitHashKey(byte[] value) { > return null; > } > }) > .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code} > > I tried to not use the Kinesis IO. and everything works well, I can't figure > out what went wrong. > I tried using the same API as the library did. > > {code:java} > .apply( > ParDo.of(new DoFn<byte[], Void>() { > private transient IKinesisProducer inlineProducer; > @Setup > public void setup(){ > KinesisProducerConfiguration config = > KinesisProducerConfiguration.fromProperties(new Properties()); > config.setRegion(Regions.US_EAST_1.getName()); > config.setCredentialsProvider(new AWSStaticCredentialsProvider(new > BasicAWSCredentials("***", "***"))); > inlineProducer = new KinesisProducer(config); > } > @ProcessElement > public void processElement(ProcessContext c) throws Exception { > ByteBuffer data = ByteBuffer.wrap(c.element()); > String partitionKey =UUID.randomUUID().toString(); > ListenableFuture<UserRecordResult> f = > getProducer().addUserRecord("***", partitionKey, data); > Futures.addCallback(f, new UserRecordResultFutureCallback()); > } > class UserRecordResultFutureCallback implements > FutureCallback<UserRecordResult> { > @Override > public void onFailure(Throwable cause) { > throw new RuntimeException("failed produce:"+cause); > } > @Override > public void onSuccess(UserRecordResult result) { > } > } > }) > ); > > {code} > > Any idea what I did wrong? or what the error in the KinesisIO? -- This message was sent by Atlassian JIRA (v7.6.3#76005)