[ https://issues.apache.org/jira/browse/BEAM-5060?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismaël Mejía resolved BEAM-5060. -------------------------------- Resolution: Fixed Fix Version/s: 2.7.0 > Issues with aws KPL while writing to kinesis using beam > ------------------------------------------------------- > > Key: BEAM-5060 > URL: https://issues.apache.org/jira/browse/BEAM-5060 > Project: Beam > Issue Type: Bug > Components: io-java-aws > Affects Versions: 2.5.0 > Reporter: Varsha Thanooj > Assignee: Alexey Romanenko > Priority: Critical > Fix For: 2.7.0 > > Attachments: pom.xml > > Time Spent: 0.5h > Remaining Estimate: 0h > > I am trying to write data to kinesis using apache beam kinesis IO. But I am > having some issues. > PS: I am using aws sts. > > The console output shows.... > > {code:java} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.lang.NoSuchMethodError: > com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture; > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:349) > at > org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:319) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:210) > at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:66) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:311) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at com.nestaway.beam_demo.KinesisSql.main(KinesisSql.java:153) > Caused by: java.lang.NoSuchMethodError: > com.amazonaws.services.kinesis.producer.IKinesisProducer.addUserRecord(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/nio/ByteBuffer;)Lorg/apache/beam/repackaged/beam_sdks_java_io_kinesis/com/google/common/util/concurrent/ListenableFuture; > at > org.apache.beam.sdk.io.kinesis.KinesisIO$Write$KinesisWriterFn.processElement(KinesisIO.java:568) > {code} > > > Code....... > data is a Pcollection in byte[ ] format. > > {code:java} > data.apply(KinesisIO.write() > .withStreamName("stagWatchBallEventStream") > .withPartitionKey("a") > .withAWSClientsProvider(new CustomKinesisClientProvider())); > {code} > > Custom Kinesis Client : > {code:java} > public class CustomKinesisClientProvider implements AWSClientsProvider { > private static final long serialVersionUID = 1L; > private static String ID = "XXXXX"; > private static String SECRET = "XXXXX"; > private static String TOKEN = "XXXXX"; > private static BasicSessionCredentials sessionCredentials = new > BasicSessionCredentials( > ID, > SECRET, > TOKEN); > private static KinesisProducerConfiguration config = new > KinesisProducerConfiguration() > .setRecordMaxBufferedTime(3000) > .setMaxConnections(1) > .setRequestTimeout(60000) > .setRegion("us-west-2") > .setCredentialsProvider(new > AWSStaticCredentialsProvider(sessionCredentials)); > @Override > public IKinesisProducer createKinesisProducer(KinesisProducerConfiguration > config) { > return new KinesisProducer(config); > } > }{code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)