Hi, 

You can try to increase the memory of TaskManager.
If there is persistent OOM, you can dump the memory and check which part is 
taking up memory.


> 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道:
> 
> Hi all,
> 
> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at Flink 
> 1.15[1], we are trying to migrate the APIs to KafkaSource and KafkaSink[2]. 
> At the same time, we also modified the serilizers[3]. Our Kafka settings are 
> not changed[4].
> 
> The services are very stable before migration. However, we get OOM errors[5] 
> after the APIs migration.
> 
> Does anyone encounter the same issue? Or anyone can give us suggestions about 
> the settings?
> 
> Many Thanks!
> 
> [1] Kafka | Apache Flink 
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
> [2] new Kafka APIs
> ```
> def getKafkaSource[T: TypeInformation](config: Config,
>                                        topic: String,
>                                        parallelism: Int,
>                                        uid: String,
>                                        env: StreamExecutionEnvironment,
>                                        deserializer: 
> DeserializationSchema[T]): DataStream[T] = {
>   val properties = getKafkaCommonProperties(config)
> 
>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
> config.getString("kafka.group.id <http://kafka.group.id/>"))
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> config.getString("kafka.session.timeout.ms 
> <http://kafka.session.timeout.ms/>"))
>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
> config.getString("kafka.receive.buffer.bytes"))
> 
>   
> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>  "3600000")
> 
>   val source = KafkaSource.builder[T]()
>     .setProperties(properties)
>     .setTopics(topic)
>     .setValueOnlyDeserializer(deserializer)
>     
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>     .build()
> 
>   env
>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>     .uid(uid)
>     .setParallelism(math.min(parallelism, env.getParallelism))
>     .setMaxParallelism(parallelism)
> }
> 
> def getKafkaSink[T: TypeInformation](config: Config,
>                                      serializer: 
> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>   val properties = getKafkaCommonProperties(config)
> 
>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
> config.getString("kafka.linger.ms <http://kafka.linger.ms/>"))
>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
> config.getString("kafka.batch.size"))
>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
> config.getString("kafka.compression.type"))
> 
>   KafkaSink.builder[T]()
>     .setKafkaProducerConfig(properties)
>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>     .setRecordSerializer(serializer)
>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>     .build()
> }
> ```
> [3] New Serializer
> import java.lang
> import java.nio.charset.StandardCharsets
> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
> import org.apache.kafka.clients.producer.ProducerRecord
> import com.appier.rt.short_term_score.model.UserSTState
> 
> class UserSTStateSerializer(topic: String) extends 
> KafkaRecordSerializationSchema[UserSTState] {
>   override def serialize(element: UserSTState, context: 
> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
> ProducerRecord[Array[Byte], Array[Byte]] = {
>     new ProducerRecord(topic, 
> element.toString.getBytes(StandardCharsets.UTF_8))
>   }
> }
> [4] Kafka Settings
> # Common
> retries = "15"
> retry.backoff.ms <http://retry.backoff.ms/> = "500"
> reconnect.backoff.ms <http://reconnect.backoff.ms/> = "1000"
> 
> # Producer
> linger.ms <http://linger.ms/> = "5"
> batch.size = "1048576"
> compression.type = "gzip"
> 
> # Consumer
> group.id <http://group.id/> = "<censored>"
> session.timeout.ms <http://session.timeout.ms/> = "100000"
> receive.buffer.bytes = "8388608"
> [5] Error Message
> ```
> java.lang.OutOfMemoryError
>       at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>       at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>       at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
> Source)
>       at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>       at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
> Source)
>       at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
>  Source)
>       at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>       at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>       at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>       at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>       at 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>       at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>       at 
> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>       at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>       at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>       at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>       at java.base/java.lang.Thread.run(Unknown Source)
>       Suppressed: java.lang.OutOfMemoryError
>               at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown 
> Source)
>               at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>               at 
> java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>               at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>               at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
> Source)
>               at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown 
> Source)
>               at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>               at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>               at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>               ... 15 more
> ```
> 
> -- 
> Regards,
> Oscar / Chen Hua Wei

Reply via email to