Re: KafkaSink.builder setDeliveryGuarantee is not a member
Hi, It seems to be an error in documents. `setDeliverGuarantee` is the method of class `KafkaSinkBuilder`, . It could be used like this : KafkaSink.builder().setDeliverGuarantee(xxx) Lars Skjærven 于2021年12月2日周四 19:34写道: > Hello, > upgrading to 1.14 I bumped into an issue with the kafka sink builder when > defining delivery guarantee: > > value setDeliveryGuarantee is not a member of > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[] > > > Seems to be working with the default value (i.e. without mentioning > setDeliveryGuarantee), but compile error when including it. > > Is it better to leave it with the default, and let the application cluster > config define this ? > > I believe I build the KafkaSink according to the docs: > > import org.apache.flink.connector.base.DeliveryGuarantee > import > org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, > KafkaSink} > import org.apache.flink.connector.kafka.source.KafkaSource > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer > > val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff > ]() > .setBootstrapServers("...") > .setRecordSerializer( > KafkaRecordSerializationSchema > .builder[SomePBStuff]() > .setTopic("mytopic") > .setKeySerializationSchema((v: SomePBStuff) => > v.key.getBytes(StandardCharsets.UTF_8)) > .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > ) > .build() > > > in build.sbt I have: > > ThisBuild / scalaVersion := "2.12.13" > val flinkVersion = "1.14.0" > > val flinkDependencies = Seq( > "org.apache.flink" % "flink-runtime" % flinkVersion % Test, > > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % > "provided", > "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", > "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", > > "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, > ) > > > >
Re: KafkaSink.builder setDeliveryGuarantee is not a member
Hi Lars, Unfortunately, there is at the moment a small bug in our documentation [1]. You can set the DeliveryGuarantee on the builder object and not on the serialization schema. Sorry for the inconvenience. Best, Fabian [1] https://github.com/apache/flink/pull/17971 On Thu, Dec 2, 2021 at 12:34 PM Lars Skjærven wrote: > > Hello, > upgrading to 1.14 I bumped into an issue with the kafka sink builder when > defining delivery guarantee: > > value setDeliveryGuarantee is not a member of > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[] > > > Seems to be working with the default value (i.e. without mentioning > setDeliveryGuarantee), but compile error when including it. > > Is it better to leave it with the default, and let the application cluster > config define this ? > > I believe I build the KafkaSink according to the docs: > > import org.apache.flink.connector.base.DeliveryGuarantee > import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, > KafkaSink} > import org.apache.flink.connector.kafka.source.KafkaSource > import > org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer > > val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]() > .setBootstrapServers("...") > .setRecordSerializer( > KafkaRecordSerializationSchema > .builder[SomePBStuff]() > .setTopic("mytopic") > .setKeySerializationSchema((v: SomePBStuff) => > v.key.getBytes(StandardCharsets.UTF_8)) > .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) > .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > ) > .build() > > > in build.sbt I have: > > ThisBuild / scalaVersion := "2.12.13" > val flinkVersion = "1.14.0" > > val flinkDependencies = Seq( > "org.apache.flink" % "flink-runtime" % flinkVersion % Test, > > "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", > "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", > "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", > > "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, > "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, > "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, > ) > > >
KafkaSink.builder setDeliveryGuarantee is not a member
Hello, upgrading to 1.14 I bumped into an issue with the kafka sink builder when defining delivery guarantee: value setDeliveryGuarantee is not a member of org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[] Seems to be working with the default value (i.e. without mentioning setDeliveryGuarantee), but compile error when including it. Is it better to leave it with the default, and let the application cluster config define this ? I believe I build the KafkaSink according to the docs: import org.apache.flink.connector.base.DeliveryGuarantee import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, KafkaSink} import org.apache.flink.connector.kafka.source.KafkaSource import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]() .setBootstrapServers("...") .setRecordSerializer( KafkaRecordSerializationSchema .builder[SomePBStuff]() .setTopic("mytopic") .setKeySerializationSchema((v: SomePBStuff) => v.key.getBytes(StandardCharsets.UTF_8)) .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray) .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build() ) .build() in build.sbt I have: ThisBuild / scalaVersion := "2.12.13" val flinkVersion = "1.14.0" val flinkDependencies = Seq( "org.apache.flink" % "flink-runtime" % flinkVersion % Test, "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-connector-kafka" % flinkVersion, "org.apache.flink" %% "flink-gelly-scala" % flinkVersion, "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test, )