Re: KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Hang Ruan
Hi,
It seems to be an error in documents. `setDeliverGuarantee` is the method
of class `KafkaSinkBuilder`, .

It could be used like this : KafkaSink.builder().setDeliverGuarantee(xxx)

Lars Skjærven  于2021年12月2日周四 19:34写道:

> Hello,
> upgrading to 1.14 I bumped into an issue with the kafka sink builder when
> defining delivery guarantee:
>
> value setDeliveryGuarantee is not a member of
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]
>
>
> Seems to be working with the default value (i.e. without mentioning
> setDeliveryGuarantee), but compile error when including it.
>
> Is it better to leave it with the default, and let the application cluster
> config define this ?
>
> I believe I build the KafkaSink according to the docs:
>
> import org.apache.flink.connector.base.DeliveryGuarantee
> import
> org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema,
> KafkaSink}
> import org.apache.flink.connector.kafka.source.KafkaSource
> import
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
>
> val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff
> ]()
>   .setBootstrapServers("...")
>   .setRecordSerializer(
> KafkaRecordSerializationSchema
>   .builder[SomePBStuff]()
>   .setTopic("mytopic")
>   .setKeySerializationSchema((v: SomePBStuff) =>
> v.key.getBytes(StandardCharsets.UTF_8))
>   .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
>   .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>   .build()
>   )
>   .build()
>
>
> in build.sbt I have:
>
> ThisBuild / scalaVersion := "2.12.13"
> val flinkVersion = "1.14.0"
>
> val flinkDependencies = Seq(
>   "org.apache.flink" % "flink-runtime" % flinkVersion % Test,
>
>   "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> "provided",
>   "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
>
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
>   "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
> )
>
>
>
>


Re: KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Fabian Paul
Hi Lars,

Unfortunately, there is at the moment a small bug in our documentation
[1]. You can set the DeliveryGuarantee on the builder object and not
on the serialization schema. Sorry for the inconvenience.

Best,
Fabian

[1] https://github.com/apache/flink/pull/17971

On Thu, Dec 2, 2021 at 12:34 PM Lars Skjærven  wrote:
>
> Hello,
> upgrading to 1.14 I bumped into an issue with the kafka sink builder when 
> defining delivery guarantee:
>
> value setDeliveryGuarantee is not a member of 
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]
>
>
> Seems to be working with the default value (i.e. without mentioning 
> setDeliveryGuarantee), but compile error when including it.
>
> Is it better to leave it with the default, and let the application cluster 
> config define this ?
>
> I believe I build the KafkaSink according to the docs:
>
> import org.apache.flink.connector.base.DeliveryGuarantee
> import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, 
> KafkaSink}
> import org.apache.flink.connector.kafka.source.KafkaSource
> import 
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
>
> val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]()
>   .setBootstrapServers("...")
>   .setRecordSerializer(
> KafkaRecordSerializationSchema
>   .builder[SomePBStuff]()
>   .setTopic("mytopic")
>   .setKeySerializationSchema((v: SomePBStuff) => 
> v.key.getBytes(StandardCharsets.UTF_8))
>   .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
>   .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>   .build()
>   )
>   .build()
>
>
> in build.sbt I have:
>
> ThisBuild / scalaVersion := "2.12.13"
> val flinkVersion = "1.14.0"
>
> val flinkDependencies = Seq(
>   "org.apache.flink" % "flink-runtime" % flinkVersion % Test,
>
>   "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
>
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
>   "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
> )
>
>
>