Re: KafkaSink.builder setDeliveryGuarantee is not a member
Hi, It seems to be an error in documents. `setDeliverGuarantee` is the method of class `KafkaSinkBuilder`, . It could be used like this : KafkaSink.builder().setDeliverGuarantee(xxx) Lars Skjærven 于2021年12月2日周四 19:34写道: > Hello, > upgrading to 1.14 I bumped into an issue with the kafka sink builder when > defining delivery guarantee: > > value setDeliveryGuarantee is not a member of > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[] > > > Seems to be working with the default value (i.e. without mentioning > setDeliveryGuarantee), but compile error when including it. > > Is it better to leave it with the default, and let the application cluster > config define this ? > > I believe I build the KafkaSink according to the docs: > > import org.apache.flink.connector.base.DeliveryGuarantee > import > org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, > KafkaSink} > import org.apache.flink.connector.kafka.source.KafkaSource > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer > > val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff > ]() > .setBootstrapServers("...") > .setRecordSerializer( > KafkaRecordSerializationSchema > .builder[SomePBStuff]() > .setTopic("mytopic") > .setKeySerializationSchema((v: SomePBStuff) => > v.key.getBytes(StandardCharsets.UTF_8)) > .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > ) > .build() > > > in build.sbt I have: > > ThisBuild / scalaVersion := "2.12.13" > val flinkVersion = "1.14.0" > > val flinkDependencies = Seq( > "org.apache.flink" % "flink-runtime" % flinkVersion % Test, > > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % > "provided", > "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", > "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", > > "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, > ) > > > >
Re: KafkaSink.builder setDeliveryGuarantee is not a member
Hi Lars, Unfortunately, there is at the moment a small bug in our documentation [1]. You can set the DeliveryGuarantee on the builder object and not on the serialization schema. Sorry for the inconvenience. Best, Fabian [1] https://github.com/apache/flink/pull/17971 On Thu, Dec 2, 2021 at 12:34 PM Lars Skjærven wrote: > > Hello, > upgrading to 1.14 I bumped into an issue with the kafka sink builder when > defining delivery guarantee: > > value setDeliveryGuarantee is not a member of > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[] > > > Seems to be working with the default value (i.e. without mentioning > setDeliveryGuarantee), but compile error when including it. > > Is it better to leave it with the default, and let the application cluster > config define this ? > > I believe I build the KafkaSink according to the docs: > > import org.apache.flink.connector.base.DeliveryGuarantee > import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, > KafkaSink} > import org.apache.flink.connector.kafka.source.KafkaSource > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer > > val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]() > .setBootstrapServers("...") > .setRecordSerializer( > KafkaRecordSerializationSchema > .builder[SomePBStuff]() > .setTopic("mytopic") > .setKeySerializationSchema((v: SomePBStuff) => > v.key.getBytes(StandardCharsets.UTF_8)) > .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > ) > .build() > > > in build.sbt I have: > > ThisBuild / scalaVersion := "2.12.13" > val flinkVersion = "1.14.0" > > val flinkDependencies = Seq( > "org.apache.flink" % "flink-runtime" % flinkVersion % Test, > > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", > "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", > > "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, > ) > > >