Hi, As far as I know, there is no Firehose sink in Flink 1.13, only a Kinesis one [1]
Best regards, Martijn Visser https://twitter.com/MartijnVisser82 https://github.com/MartijnVisser [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/datastream/overview/ On Thu, 12 May 2022 at 09:37, Zain Haider Nemati <zain.hai...@retailo.co> wrote: > Hi, Appreciate your response. > My flink version is 1.13. > Is there any other way to sink data to kinesis without having to update to > 1.15 > > On Thu, May 12, 2022 at 12:25 PM Martijn Visser <martijnvis...@apache.org> > wrote: > >> I'm guessing this must be Flink 1.15 since Firehose was added in that >> version :) >> >> On Thu, 12 May 2022 at 08:41, yu'an huang <h.yuan...@gmail.com> wrote: >> >>> Hi, >>> >>> Your code is working fine in my computer. What is the Flink version you >>> are using. >>> >>> >>> >>> >>> On 12 May 2022, at 3:39 AM, Zain Haider Nemati <zain.hai...@retailo.co> >>> wrote: >>> >>> Hi Folks, >>> Getting this error when sinking data to a firehosesink, would really >>> appreciate some help ! >>> >>> DataStream<String> inputStream = env.addSource(new >>> FlinkKafkaConsumer<>("xxx", new SimpleStringSchema(), properties)); >>> >>> Properties sinkProperties = new Properties(); >>> sinkProperties.put(AWSConfigConstants.AWS_REGION, "xxx"); >>> sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, >>> "xxx"); >>> sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, >>> "xxx"); >>> KinesisFirehoseSink<String> kdfSink = >>> KinesisFirehoseSink.<String>builder() >>> .setFirehoseClientProperties(sinkProperties) >>> .setSerializationSchema(new SimpleStringSchema()) >>> .setDeliveryStreamName("xxx") >>> .setMaxBatchSize(350) >>> .build(); >>> >>> inputStream.sinkTo(kdfSink); >>> >>> incompatible types: >>> org.apache.flink.connector.firehose.sink.KinesisFirehoseSink<java.lang.String> >>> cannot be converted to >>> org.apache.flink.api.connector.sink.Sink<java.lang.String,?,?,?> >>> >>> >>>