[ 
https://issues.apache.org/jira/browse/FLINK-30960?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17688044#comment-17688044
 ] 

Bo Shen commented on FLINK-30960:
---------------------------------

Just to share what I thought on this issue:

The issue is probably due to the assumption that the data stream is mostly 
continuous, the exception thrown in the "flushing" thread is cached for for 
processing in the main task thread.

In the main task thread, is where the exception can be thrown up along the 
abstraction layers, and ultimately caught by the task failure exception 
handling logic.

There seems no way for the other "flushing" thread to properly interact with 
the main task thread. (maybe interrupt it?). I tried to utilize the "mailbox" 
mechanism in StreamTask, but the abstration layers are pretty thick to 
penetrate.

 

The problem as far as I could see, in a pulse-like data stream, is two fold:
 # Processing of exception in "flushing" thread is delayed until there's new 
data incoming. This delay could be hours or days depending on the use case. 
Idealy it would better for the task to "fail-fast".
 # "flushing" thread repeatly throws a new RuntimeException wrapping the last 
exception as the cause. This builds a growing exception hierachy and eats up 
memory. When new data arrives, the exception stringify operation could easily 
cause stack overflow or out of memory.

 

If it's acceptable, I would like to contribute a patch for the latter issue, 
which could be easily fixed by storing the exception thrown by the "flushing" 
thread for comparison, and just don't throw it again.

 

For the former issue, there might need some major work done on the excepton 
handling mechanism across thread boundaries. Hope someone more familiar with 
the project could shed some light on this.

 

> OutOfMemory error using jdbc sink
> ---------------------------------
>
>                 Key: FLINK-30960
>                 URL: https://issues.apache.org/jira/browse/FLINK-30960
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.16.0
>         Environment: Using SQL-Client + TableAPI
> Processing Kafka messages to MySQL databases.
>            Reporter: Bo Shen
>            Priority: Major
>         Attachments: heap-memory-metrics.png, 
> image-2023-02-08-16-58-27-765.png, taskmanager.log
>
>
> Here in production I have an environment consisting of a kafka source, to 
> which third-party providers push a few thousand messages every hour, and I 
> use flink jdbc sink to store those messages into a mysql database.
> Recently there are a few taskmanager process crashes. My investigation 
> suggests that the cause boils down to the way exceptions are handled in jdbc 
> batched mode.
>  
> When writing to JDBC failed in batched mode due to some error like 
> DataTuncation, the exception is stored in field "flushException" waiting to 
> be processed by the task main thread.
> This exception is only checked on the next call to "writeRecords". In my 
> case, if the exception happens to occur when processing the last batch of 
> records, and there are no further record comming from source for the next 
> hour, the flushing thread simply repeatly throws a new RuntimeException 
> wrapping the last "flushException" as cause. The new RuntimeException is 
> stored as the new "flushException". 
> Hence, every time the RuntimeException is thrown, the exception hierachy gets 
> bigger, and eventually before the exception is processed on the main thread, 
> the jvm runs out of memory, which causes an unexpected process crash. 
> !image-2023-02-08-16-58-27-765.png!
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to