[ https://issues.apache.org/jira/browse/BEAM-7357?focusedWorklogId=253855&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-253855 ]
ASF GitHub Bot logged work on BEAM-7357: ---------------------------------------- Author: ASF GitHub Bot Created on: 04/Jun/19 17:16 Start Date: 04/Jun/19 17:16 Worklog Time Spent: 10m Work Description: aromanenko-dev commented on pull request #8758: [BEAM-7357] Remove stream existence check URL: https://github.com/apache/beam/pull/8758 AWS Kinesis has a limit of 10 transactions of `DescribeStream` (which is used to check a stream) call per second per account. It will fail in case if we run a pipeline with many (>10) workers. So, we move the responsibility of this checking to AWS Kinesis. ------------------------ Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily: - [x] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`). - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf). Post-Commit Tests Status (on master branch) ------------------------------------------------------------------------------------------------ Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark --- | --- | --- | --- | --- | --- | --- | --- Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | --- Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/) Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_Verify/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python3_Verify/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/) <br> [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_PVR_Flink_Cron/lastCompletedBuild/) | --- | --- | --- Pre-Commit Tests Status (on master branch) ------------------------------------------------------------------------------------------------ --- |Java | Python | Go | Website --- | --- | --- | --- | --- Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 253855) Time Spent: 40m (was: 0.5h) > Kinesis IO.write throws LimitExceededException > ---------------------------------------------- > > Key: BEAM-7357 > URL: https://issues.apache.org/jira/browse/BEAM-7357 > Project: Beam > Issue Type: Bug > Components: io-java-kinesis > Affects Versions: 2.11.0 > Reporter: Brachi Packter > Assignee: Alexey Romanenko > Priority: Major > Fix For: 2.14.0 > > Time Spent: 40m > Remaining Estimate: 0h > > I used Kinesis IO to write to kinesis. I get very quickly many exceptions > like: > [shard_map.cc:150] Shard map update for stream "***" failed. Code: > LimitExceededException Message: Rate exceeded for stream *** under account > ***; retrying in .. > Also, I see many exceptions like: > Caused by: java.lang.IllegalArgumentException: Stream ** does not exist at > org.apache.beam.vendor.guava.v20_0.com.google.common.base.Preconditions.checkArgument(Preconditions.java:191) > at > org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.setup(KinesisIO.java:515) > I'm sure this stream exists because I can see some data from my pipeline that > was successfully ingested to it. > > Here is my code: > > > {code:java} > .apply(KinesisIO.write() > .withStreamName("**") > .withPartitioner(new KinesisPartitioner() { > @Override > public String getPartitionKey(byte[] value) { > return UUID.randomUUID().toString() > } > @Override > public String getExplicitHashKey(byte[] value) { > return null; > } > }) > .withAWSClientsProvider("**","***",Regions.US_EAST_1));{code} > > I tried to not use the Kinesis IO. and everything works well, I can't figure > out what went wrong. > I tried using the same API as the library did. > > {code:java} > .apply( > ParDo.of(new DoFn<byte[], Void>() { > private transient IKinesisProducer inlineProducer; > @Setup > public void setup(){ > KinesisProducerConfiguration config = > KinesisProducerConfiguration.fromProperties(new Properties()); > config.setRegion(Regions.US_EAST_1.getName()); > config.setCredentialsProvider(new AWSStaticCredentialsProvider(new > BasicAWSCredentials("***", "***"))); > inlineProducer = new KinesisProducer(config); > } > @ProcessElement > public void processElement(ProcessContext c) throws Exception { > ByteBuffer data = ByteBuffer.wrap(c.element()); > String partitionKey =UUID.randomUUID().toString(); > ListenableFuture<UserRecordResult> f = > getProducer().addUserRecord("***", partitionKey, data); > Futures.addCallback(f, new UserRecordResultFutureCallback()); > } > class UserRecordResultFutureCallback implements > FutureCallback<UserRecordResult> { > @Override > public void onFailure(Throwable cause) { > throw new RuntimeException("failed produce:"+cause); > } > @Override > public void onSuccess(UserRecordResult result) { > } > } > }) > ); > > {code} > > Any idea what I did wrong? or what the error in the KinesisIO? -- This message was sent by Atlassian JIRA (v7.6.3#76005)