Hi Huweihua,

Thanks for the reply. Yes, we increased memory first.
But we are still curious about the memory increasing with the new Kafka
APIs/Serilizers.


On Mon, Apr 25, 2022 at 8:38 PM huweihua <huweihua....@gmail.com> wrote:

> Hi,
>
> You can try to increase the memory of TaskManager.
> If there is persistent OOM, you can dump the memory and check which part
> is taking up memory.
>
>
> 2022年4月25日 上午11:44,Hua Wei Chen <oscar.chen....@gmail.com> 写道:
>
> Hi all,
>
> Due to FlinkKafkaConsumer and FlinkKafkaProducer will be depreciated at
> Flink 1.15*[1]*, we are trying to migrate the APIs to KafkaSource and
> KafkaSink*[2]*. At the same time, we also modified the serilizers*[3]*.
> Our Kafka settings are not changed*[4]*.
>
> The services are very stable before migration. However, we get OOM errors
> *[5]* after the APIs migration.
>
> Does anyone encounter the same issue? Or anyone can give us suggestions
> about the settings?
>
> Many Thanks!
>
> [1] Kafka | Apache Flink
> <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#kafka-sourcefunction>
> [2] new Kafka APIs
> ```
>
> def getKafkaSource[T: TypeInformation](config: Config,
>                                        topic: String,
>                                        parallelism: Int,
>                                        uid: String,
>                                        env: StreamExecutionEnvironment,
>                                        deserializer: 
> DeserializationSchema[T]): DataStream[T] = {
>   val properties = getKafkaCommonProperties(config)
>
>   properties.put(ConsumerConfig.GROUP_ID_CONFIG, 
> config.getString("kafka.group.id"))
>   properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 
> config.getString("kafka.session.timeout.ms"))
>   properties.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 
> config.getString("kafka.receive.buffer.bytes"))
>
>   
> properties.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS,
>  "3600000")
>
>   val source = KafkaSource.builder[T]()
>     .setProperties(properties)
>     .setTopics(topic)
>     .setValueOnlyDeserializer(deserializer)
>     
> .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
>     .build()
>
>   env
>     .fromSource(source, WatermarkStrategy.noWatermarks[T], uid)
>     .uid(uid)
>     .setParallelism(math.min(parallelism, env.getParallelism))
>     .setMaxParallelism(parallelism)
> }
>
> def getKafkaSink[T: TypeInformation](config: Config,
>                                      serializer: 
> KafkaRecordSerializationSchema[T]): KafkaSink[T] = {
>   val properties = getKafkaCommonProperties(config)
>
>   properties.put(ProducerConfig.LINGER_MS_CONFIG, 
> config.getString("kafka.linger.ms"))
>   properties.put(ProducerConfig.BATCH_SIZE_CONFIG, 
> config.getString("kafka.batch.size"))
>   properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
> config.getString("kafka.compression.type"))
>
>   KafkaSink.builder[T]()
>     .setKafkaProducerConfig(properties)
>     .setBootstrapServers(config.getString("kafka.bootstrap.servers"))
>     .setRecordSerializer(serializer)
>     .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
>     .build()
> }
>
> ```
> [3] New Serializer
>
> import java.lang
> import java.nio.charset.StandardCharsets
> import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema
> import org.apache.kafka.clients.producer.ProducerRecord
> import com.appier.rt.short_term_score.model.UserSTState
>
> class UserSTStateSerializer(topic: String) extends 
> KafkaRecordSerializationSchema[UserSTState] {
>   override def serialize(element: UserSTState, context: 
> KafkaRecordSerializationSchema.KafkaSinkContext, timestamp: lang.Long): 
> ProducerRecord[Array[Byte], Array[Byte]] = {
>     new ProducerRecord(topic, 
> element.toString.getBytes(StandardCharsets.UTF_8))
>   }
> }
>
> [4] Kafka Settings
>
> # Common
> retries = "15"
> retry.backoff.ms = "500"
> reconnect.backoff.ms = "1000"
>
> # Producer
> linger.ms = "5"
> batch.size = "1048576"
> compression.type = "gzip"
>
> # Consumer
> group.id = "<censored>"
> session.timeout.ms = "100000"
> receive.buffer.bytes = "8388608"
>
> [5] *Error Message*
> ```
> java.lang.OutOfMemoryError
>
>       at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown Source)
>       at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>       at java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown 
> Source)
>       at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>       at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
> Source)
>       at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.setBlockDataMode(Unknown
>  Source)
>       at java.base/java.io.ObjectOutputStream.writeObject0(Unknown Source)
>       at java.base/java.io.ObjectOutputStream.writeObject(Unknown Source)
>       at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)
>       at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
>       at 
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation.<init>(RemoteRpcInvocation.java:55)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.createRpcInvocationMessage(AkkaInvocationHandler.java:302)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invokeRpc(AkkaInvocationHandler.java:217)
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.invoke(AkkaInvocationHandler.java:138)
>       at com.sun.proxy.$Proxy64.submitTask(Unknown Source)
>       at 
> org.apache.flink.runtime.jobmaster.RpcTaskManagerGateway.submitTask(RpcTaskManagerGateway.java:60)
>       at 
> org.apache.flink.runtime.executiongraph.Execution.lambda$deploy$4(Execution.java:589)
>       at 
> java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown 
> Source)
>       at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
>       at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>       at 
> java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>       at java.base/java.lang.Thread.run(Unknown Source)
>       Suppressed: java.lang.OutOfMemoryError
>               at java.base/java.io.ByteArrayOutputStream.hugeCapacity(Unknown 
> Source)
>               at java.base/java.io.ByteArrayOutputStream.grow(Unknown Source)
>               at 
> java.base/java.io.ByteArrayOutputStream.ensureCapacity(Unknown Source)
>               at java.base/java.io.ByteArrayOutputStream.write(Unknown Source)
>               at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.drain(Unknown 
> Source)
>               at 
> java.base/java.io.ObjectOutputStream$BlockDataOutputStream.flush(Unknown 
> Source)
>               at java.base/java.io.ObjectOutputStream.flush(Unknown Source)
>               at java.base/java.io.ObjectOutputStream.close(Unknown Source)
>               at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:635)
>               ... 15 more
>
> ```
>
> --
> *Regards,*
> *Oscar / Chen Hua Wei*
>
>
>

Reply via email to