[ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515072#comment-16515072
 ] 

Rinat Sharipov commented on FLINK-9603:
---------------------------------------

Hi mates, I got the following proposal about fix of this issue:
 * build path using the same method (or some kind of Builder), instead of using 
the same logic multiple times across the code (DRY)
 * test, that part index is properly incremented, when part suffix is specified
 ** in-progress file exists
 ** pending file exists
 ** file in final state exists
 * and test the same cases when part suffix is not specified

 

 

> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9603
>                 URL: https://issues.apache.org/jira/browse/FLINK-9603
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.5.0
>            Reporter: Rinat Sharipov
>            Assignee: vinoyang
>            Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>   
>  During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
>  So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>   
>  This problem is related with the following code:
>  *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
>  *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>   
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>       fs.exists(getPendingPathFor(partPath)) ||
>       fs.exists(getInProgressPathFor(partPath))) {
>    bucketState.partCounter++;
>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
>    partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to