[ 
https://issues.apache.org/jira/browse/FLINK-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16515083#comment-16515083
 ] 

ASF GitHub Bot commented on FLINK-9603:
---------------------------------------

GitHub user kent2171 opened a pull request:

    https://github.com/apache/flink/pull/6176

    [FLINK-9603][connector-filesystem] fix part indexing, when part suffix is 
specified

    This pull-request fixes problem of incorrect part file index lookup, when 
part suffix is specified.
    Part file path should be assembled with part suffix, before check on 
existance
    
    The following tests, that verify part file indexing, have been added:
    - 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInProgressState
    - 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInPendingState
    - 
testThatPartIndexIsIncrementedWhenPartSuffixIsSpecifiedAndPreviousPartFileInFinalState
    - 
testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInProgressState
    - 
testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInPendingState
    - 
testThatPartIndexIsIncrementedWhenPartSuffixIsNotSpecifiedAndPreviousPartFileInFinalState

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kent2171/flink FLINK-9603_fix_part_idx_inc

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6176.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6176
    
----
commit c08b81044bf80d5633a9936afb73fadf021cef47
Author: Rinat Sharipov <r.sharipov@...>
Date:   2018-06-17T13:03:54Z

    FLINK-9603 1. all logic, that is responsible for path assembly moved into 
method; 2. test logic of part file indexing, when in-progress/ pending/ final 
part files already exists in bucket; 3. test the same logic, when part file has 
suffix

----


> Incorrect indexing of part files, when part suffix is specified 
> (FileAlreadyExistsException)
> --------------------------------------------------------------------------------------------
>
>                 Key: FLINK-9603
>                 URL: https://issues.apache.org/jira/browse/FLINK-9603
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.5.0
>            Reporter: Rinat Sharipov
>            Assignee: vinoyang
>            Priority: Major
>
> Hi mates, since 1.5 release, BucketingSink has ability to configure suffix of 
> the part file. It’s very useful, when it’s necessary to set specific 
> extension of the file.
>   
>  During the usage, I’ve found the issue - when new part file is created, it 
> has the same part index, as index of just closed file. 
>  So, when Flink tries to move it into final state, we have a 
> FileAlreadyExistsException.
>   
>  This problem is related with the following code:
>  *{color:#e32400}Here we are trying to find the max index of part file, that 
> doesn’t exist in bucket directory, the problem is, that the partSuffix is not 
> involved into path assembly. This means, that path always doesn’t 
> exist{color}*
>  *{color:#e32400}and partCounter wouldn’t be ever incremented.{color}*
>   
> {code:java}
> Path partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> while (fs.exists(partPath) ||
>       fs.exists(getPendingPathFor(partPath)) ||
>       fs.exists(getInProgressPathFor(partPath))) {
>    bucketState.partCounter++;
>    partPath = new Path(bucketPath, partPrefix + "-" + subtaskIndex + "-" + 
> bucketState.partCounter);
> }
> bucketState.creationTime = processingTimeService.getCurrentProcessingTime();
> {code}
> *{color:#e32400}Before creating of writer, we appending the partSuffix here, 
> but it should be already appended, before index checks{color}*
> {code:java}
> if (partSuffix != null) {
>    partPath = partPath.suffix(partSuffix);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to