Re: KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Hang Ruan
Hi,
It seems to be an error in documents. `setDeliverGuarantee` is the method
of class `KafkaSinkBuilder`, .

It could be used like this : KafkaSink.builder().setDeliverGuarantee(xxx)

Lars Skjærven  于2021年12月2日周四 19:34写道:

> Hello,
> upgrading to 1.14 I bumped into an issue with the kafka sink builder when
> defining delivery guarantee:
>
> value setDeliveryGuarantee is not a member of
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]
>
>
> Seems to be working with the default value (i.e. without mentioning
> setDeliveryGuarantee), but compile error when including it.
>
> Is it better to leave it with the default, and let the application cluster
> config define this ?
>
> I believe I build the KafkaSink according to the docs:
>
> import org.apache.flink.connector.base.DeliveryGuarantee
> import
> org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema,
> KafkaSink}
> import org.apache.flink.connector.kafka.source.KafkaSource
> import
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
>
> val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff
> ]()
>   .setBootstrapServers("...")
>   .setRecordSerializer(
> KafkaRecordSerializationSchema
>   .builder[SomePBStuff]()
>   .setTopic("mytopic")
>   .setKeySerializationSchema((v: SomePBStuff) =>
> v.key.getBytes(StandardCharsets.UTF_8))
>   .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
>   .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>   .build()
>   )
>   .build()
>
>
> in build.sbt I have:
>
> ThisBuild / scalaVersion := "2.12.13"
> val flinkVersion = "1.14.0"
>
> val flinkDependencies = Seq(
>   "org.apache.flink" % "flink-runtime" % flinkVersion % Test,
>
>   "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion %
> "provided",
>   "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
>
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
>   "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
> )
>
>
>
>


Re: KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Fabian Paul
Hi Lars,

Unfortunately, there is at the moment a small bug in our documentation
[1]. You can set the DeliveryGuarantee on the builder object and not
on the serialization schema. Sorry for the inconvenience.

Best,
Fabian

[1] https://github.com/apache/flink/pull/17971

On Thu, Dec 2, 2021 at 12:34 PM Lars Skjærven  wrote:
>
> Hello,
> upgrading to 1.14 I bumped into an issue with the kafka sink builder when 
> defining delivery guarantee:
>
> value setDeliveryGuarantee is not a member of 
> org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]
>
>
> Seems to be working with the default value (i.e. without mentioning 
> setDeliveryGuarantee), but compile error when including it.
>
> Is it better to leave it with the default, and let the application cluster 
> config define this ?
>
> I believe I build the KafkaSink according to the docs:
>
> import org.apache.flink.connector.base.DeliveryGuarantee
> import org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema, 
> KafkaSink}
> import org.apache.flink.connector.kafka.source.KafkaSource
> import 
> org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
>
> val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]()
>   .setBootstrapServers("...")
>   .setRecordSerializer(
> KafkaRecordSerializationSchema
>   .builder[SomePBStuff]()
>   .setTopic("mytopic")
>   .setKeySerializationSchema((v: SomePBStuff) => 
> v.key.getBytes(StandardCharsets.UTF_8))
>   .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
>   .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>   .build()
>   )
>   .build()
>
>
> in build.sbt I have:
>
> ThisBuild / scalaVersion := "2.12.13"
> val flinkVersion = "1.14.0"
>
> val flinkDependencies = Seq(
>   "org.apache.flink" % "flink-runtime" % flinkVersion % Test,
>
>   "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
>   "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",
>
>   "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
>   "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
>   "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
> )
>
>
>


KafkaSink.builder setDeliveryGuarantee is not a member

2021-12-02 Thread Lars Skjærven
Hello,
upgrading to 1.14 I bumped into an issue with the kafka sink builder when
defining delivery guarantee:

value setDeliveryGuarantee is not a member of
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaBuilder[]


Seems to be working with the default value (i.e. without mentioning
setDeliveryGuarantee), but compile error when including it.

Is it better to leave it with the default, and let the application cluster
config define this ?

I believe I build the KafkaSink according to the docs:

import org.apache.flink.connector.base.DeliveryGuarantee
import
org.apache.flink.connector.kafka.sink.{KafkaRecordSerializationSchema,
KafkaSink}
import org.apache.flink.connector.kafka.source.KafkaSource
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer

val kafkaSink: KafkaSink[SomePBStuff] = KafkaSink.builder[SomePBStuff]()
  .setBootstrapServers("...")
  .setRecordSerializer(
KafkaRecordSerializationSchema
  .builder[SomePBStuff]()
  .setTopic("mytopic")
  .setKeySerializationSchema((v: SomePBStuff) =>
v.key.getBytes(StandardCharsets.UTF_8))
  .setValueSerializationSchema((v: SomePBStuff) => v.toByteArray)
  .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
  .build()
  )
  .build()


in build.sbt I have:

ThisBuild / scalaVersion := "2.12.13"
val flinkVersion = "1.14.0"

val flinkDependencies = Seq(
  "org.apache.flink" % "flink-runtime" % flinkVersion % Test,

  "org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
  "org.apache.flink" %% "flink-clients" % flinkVersion % "provided",

  "org.apache.flink" %% "flink-connector-kafka" % flinkVersion,
  "org.apache.flink" %% "flink-gelly-scala" % flinkVersion,
  "org.apache.flink" %% "flink-test-utils" % flinkVersion % Test,
)