Hi Huweihua, Thanks for the reply. Yes, we increased memory first. But we are still curious about the memory increasing with the new Kafka APIs/Serilizers.
On Mon, Apr 25, 2022 at 8:38 PM huweihua <huweihua....@gmail.com> wrote: > Hi, > > You can try to increase the memory of TaskManager. > If there is persistent OOM, you can dump the memory and check which part > is taking up memory. > > > 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道: > > Hi all, > > Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at > Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and > KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*. > Our Kafka settings are not changed*[4]*. > > The services are very stable before migration. However, we get OOM errors > *[5]* after the APIs migration. > > Does anyone encounter the same issue? Or anyone can give us suggestions > about the settings? > > Many Thanks! > > [1] Kafka | Apache Flink > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction> > [2] new Kafka APIs > ``` > > def getKafkaSource[T: TypeInformation](config: Config, > topic: String, > parallelism: Int, > uid: String, > env: StreamExecutionEnvironment, > deserializer: > DeserializationSchema[T]): DataStream[T] = { > val properties = getKafkaCommonProperties(config) > > properties.put(ConsumerConfig.GROUP_ID_CONFIG, > config.getString("kafka.group.id")) > properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, > config.getString("kafka.session.timeout.ms")) > properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, > config.getString("kafka.receive.buffer.bytes")) > > > properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, > "3600000") > > val source = KafkaSource.builder[T]() > .setProperties(properties) > .setTopics(topic) > .setValueOnlyDeserializer(deserializer) > > .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST)) > .build() > > env > .fromSource(source, WatermarkStrategy.noWatermarks[T], uid) > .uid(uid) > .setParallelism(math.min(parallelism, env.getParallelism)) > .setMaxParallelism(parallelism) > } > > def getKafkaSink[T: TypeInformation](config: Config, > serializer: > KafkaRecordSerializationSchema[T]): KafkaSink[T] = { > val properties = getKafkaCommonProperties(config) > > properties.put(ProducerConfig.LINGER_MS_CONFIG, > config.getString("kafka.linger.ms")) > properties.put(ProducerConfig.BATCH_SIZE_CONFIG, > config.getString("kafka.batch.size")) > properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, > config.getString("kafka.compression.type")) > > KafkaSink.builder[T]() > .setKafkaProducerConfig(properties) > .setBootstrapServers(config.getString("kafka.bootstrap.servers")) > .setRecordSerializer(serializer) > .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) > .build() > } > > ``` > [3] New Serializer > > import java.lang > import java.nio.charset.StandardCharsets > import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema > import org.apache.kafka.clients.producer.ProducerRecord > import com.appier.rt.short_term_score.model.UserSTState > > class UserSTStateSerializer(topic: String) extends > KafkaRecordSerializationSchema[UserSTState] { > override def serialize(element: UserSTState, context: > KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): > ProducerRecord[Array[Byte], Array[Byte]] = { > new ProducerRecord(topic, > element.toString.getBytes(StandardCharsets.UTF_8)) > } > } > > [4] Kafka Settings > > # Common > retries = "15" > retry.backoff.ms = "500" > reconnect.backoff.ms = "1000" > > # Producer > linger.ms = "5" > batch.size = "1048576" > compression.type = "gzip" > > # Consumer > group.id = "<censored>" > session.timeout.ms = "100000" > receive.buffer.bytes = "8388608" > > [5] *Error Message* > ``` > java.lang.OutOfMemoryError > > at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source) > at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) > at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown > Source) > at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) > at > java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown > Source) > at > java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown > Source) > at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source) > at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632) > at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62) > at > org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217) > at > org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138) > at com.sun.proxy.$Proxy64.submitTask(Unknown Source) > at > org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60) > at > org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589) > at > java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown > Source) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > at java.base/java.util.concurrent.FutureTask.run(Unknown Source) > at > java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown > Source) > at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown > Source) > at java.base/java.lang.Thread.run(Unknown Source) > Suppressed: java.lang.OutOfMemoryError > at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown > Source) > at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source) > at > java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source) > at java.base/java.io.ByteArrayOutputStream.write(Unknown Source) > at > java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown > Source) > at > java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown > Source) > at java.base/java.io.ObjectOutputStream.flush(Unknown Source) > at java.base/java.io.ObjectOutputStream.close(Unknown Source) > at > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635) > ... 15 more > > ``` > > -- > *Regards,* > *Oscar / Chen Hua Wei* > > >