[ 
https://issues.apache.org/jira/browse/FLINK-9558?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rinat Sharipov updated FLINK-9558:
----------------------------------
    Description: 
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
 For some reasons, those jobs are running without checkpointing. For now, it 
not a big problem for us, if some files are remained opened in case of job 
reloading.
  
 Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
  
 During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
 After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
  
 I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
       bucketState.pendingFiles.isEmpty() &&
       bucketState.pendingFilesPerCheckpoint.isEmpty()) {

       // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
       // Therefore this bucket is currently inactive and we can remove it from 
our state.
       bucketStatesIt.remove();
    }
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
  
 Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.

As for me, it seems, that we should at least document such behaviour, or 
implement any fail-fast implementation, that wouldn't work in env with disabled 
checkpointing

 

  was:
Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
using Bucketing-Sink.
For some reasons, those jobs are running without checkpointing. For now, it not 
a big problem for us, if some files are remained opened in case of job 
reloading.
 
Periodically, those jobs fail with *OutOfMemory* exception, and seems, that I 
found a strange thing in the implementation of BucketingSink.
 
During the sink lifecycle, we have a state object, implemented as a map, where 
key is a bucket path, and value is a state, that contains information about 
opened files and list of pending files.
After researching of the heap dump, I found, that those state stores 
information about ~ 1_000 buckets and their state, all this stuff weights ~ 120 
Mb.
 
I’ve looked through the code, and found, that we removing the buckets from the 
state, in *notifyCheckpointComplete* method. 
{code:java}
@Override
public void notifyCheckpointComplete(long checkpointId) throws Exception {
  Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = 
state.bucketStates.entrySet().iterator();
  while (bucketStatesIt.hasNext()) {
       if (!bucketState.isWriterOpen &&
       bucketState.pendingFiles.isEmpty() &&
       bucketState.pendingFilesPerCheckpoint.isEmpty()) {

       // We've dealt with all the pending files and the writer for this bucket 
is not currently open.
       // Therefore this bucket is currently inactive and we can remove it from 
our state.
       bucketStatesIt.remove();
    }
    }
}
{code}
So, this looks like an issue, when you are using this sink in checkpointless 
environment, because the data always added to the state, but never removed.
 
Of course, we could enable checkpointing, and use one of available backends, 
but as for me, it seems like a non expected behaviour, like I have an 
opportunity to run the job without checkpointing, but really, if I do so, I got 
an exception in sink component.
 


> Memory leaks during usage of bucketing-sink with disabled checkpointing
> -----------------------------------------------------------------------
>
>                 Key: FLINK-9558
>                 URL: https://issues.apache.org/jira/browse/FLINK-9558
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.3.0
>            Reporter: Rinat Sharipov
>            Priority: Major
>
> Hi mates, we got some Flink jobs, that are writing data from kafka into hdfs, 
> using Bucketing-Sink.
>  For some reasons, those jobs are running without checkpointing. For now, it 
> not a big problem for us, if some files are remained opened in case of job 
> reloading.
>   
>  Periodically, those jobs fail with *OutOfMemory* exception, and seems, that 
> I found a strange thing in the implementation of BucketingSink.
>   
>  During the sink lifecycle, we have a state object, implemented as a map, 
> where key is a bucket path, and value is a state, that contains information 
> about opened files and list of pending files.
>  After researching of the heap dump, I found, that those state stores 
> information about ~ 1_000 buckets and their state, all this stuff weights ~ 
> 120 Mb.
>   
>  I’ve looked through the code, and found, that we removing the buckets from 
> the state, in *notifyCheckpointComplete* method. 
> {code:java}
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
>   Iterator<Map.Entry<String, BucketState<T>>> bucketStatesIt = 
> state.bucketStates.entrySet().iterator();
>   while (bucketStatesIt.hasNext()) {
>        if (!bucketState.isWriterOpen &&
>        bucketState.pendingFiles.isEmpty() &&
>        bucketState.pendingFilesPerCheckpoint.isEmpty()) {
>        // We've dealt with all the pending files and the writer for this 
> bucket is not currently open.
>        // Therefore this bucket is currently inactive and we can remove it 
> from our state.
>        bucketStatesIt.remove();
>     }
>     }
> }
> {code}
> So, this looks like an issue, when you are using this sink in checkpointless 
> environment, because the data always added to the state, but never removed.
>   
>  Of course, we could enable checkpointing, and use one of available backends, 
> but as for me, it seems like a non expected behaviour, like I have an 
> opportunity to run the job without checkpointing, but really, if I do so, I 
> got an exception in sink component.
> As for me, it seems, that we should at least document such behaviour, or 
> implement any fail-fast implementation, that wouldn't work in env with 
> disabled checkpointing
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to