[ 
https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667492#comment-15667492
 ] 

ASF GitHub Bot commented on FLINK-5056:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2797#discussion_r88043878
  
    --- Diff: 
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
    @@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState<T> 
bucketState) throws Exception {
     
        /**
         * Gets the truncate() call using reflection.
    -    *
         * <p>
    -    * Note: This code comes from Flume
    +    * <b>NOTE:</b> This code comes from Flume.
         */
        private Method reflectTruncate(FileSystem fs) {
    -           Method m = null;
    -           if(fs != null) {
    -                   Class<?> fsClass = fs.getClass();
    -                   try {
    -                           m = fsClass.getMethod("truncate", Path.class, 
long.class);
    -                   } catch (NoSuchMethodException ex) {
    -                           LOG.debug("Truncate not found. Will write a 
file with suffix '{}' " +
    -                                   " and prefix '{}' to specify how many 
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
    -                           return null;
    -                   }
    +           if (this.refTruncate == null) {
    +                   Method m = null;
    +                   if (fs != null) {
    +                           Class<?> fsClass = fs.getClass();
    +                           try {
    +                                   m = fsClass.getMethod("truncate", 
Path.class, long.class);
    +                           } catch (NoSuchMethodException ex) {
    +                                   LOG.debug("Truncate not found. Will 
write a file with suffix '{}' " +
    +                                           " and prefix '{}' to specify 
how many bytes in a bucket are valid.",
    +                                           validLengthSuffix, 
validLengthPrefix);
    +                                   return null;
    +                           }
    +
    +                           // verify that truncate actually works
    +                           FSDataOutputStream outputStream;
    +                           Path testPath = new 
Path(UUID.randomUUID().toString());
    +                           try {
    +                                   outputStream = fs.create(testPath);
    +                                   outputStream.writeUTF("hello");
    +                                   outputStream.close();
    +                           } catch (IOException e) {
    +                                   LOG.error("Could not create file for 
checking if truncate works.", e);
    +                                   throw new RuntimeException("Could not 
create file for checking if truncate works.", e);
    +                           }
     
    +                           try {
    +                                   m.invoke(fs, testPath, 2);
    +                           } catch (IllegalAccessException | 
InvocationTargetException e) {
    +                                   LOG.debug("Truncate is not supported.", 
e);
    +                                   m = null;
    +                           }
     
    -                   // verify that truncate actually works
    -                   FSDataOutputStream outputStream;
    -                   Path testPath = new Path(UUID.randomUUID().toString());
    -                   try {
    -                           outputStream = fs.create(testPath);
    -                           outputStream.writeUTF("hello");
    -                           outputStream.close();
    -                   } catch (IOException e) {
    -                           LOG.error("Could not create file for checking 
if truncate works.", e);
    -                           throw new RuntimeException("Could not create 
file for checking if truncate works.", e);
    +                           try {
    +                                   fs.delete(testPath, false);
    +                           } catch (IOException e) {
    +                                   LOG.error("Could not delete truncate 
test file.", e);
    +                                   throw new RuntimeException("Could not 
delete truncate test file.", e);
    +                           }
                        }
    +                   this.refTruncate = m;
    +           }
    +           return this.refTruncate;
    +   }
     
    +   private Path getPendingPathFor(Path path) {
    +           return new Path(path.getParent(), pendingPrefix + 
path.getName()).suffix(pendingSuffix);
    +   }
     
    -                   try {
    -                           m.invoke(fs, testPath, 2);
    -                   } catch (IllegalAccessException | 
InvocationTargetException e) {
    -                           LOG.debug("Truncate is not supported.", e);
    -                           m = null;
    -                   }
    +   private Path getInProgressPathFor(Path path) {
    +           return new Path(path.getParent(), inProgressPrefix + 
path.getName()).suffix(inProgressSuffix);
    +   }
     
    -                   try {
    -                           fs.delete(testPath, false);
    -                   } catch (IOException e) {
    -                           LOG.error("Could not delete truncate test 
file.", e);
    -                           throw new RuntimeException("Could not delete 
truncate test file.", e);
    -                   }
    -           }
    -           return m;
    +   private Path getValidLengthPathFor(Path path) {
    +           return new Path(path.getParent(), validLengthPrefix + 
path.getName()).suffix(validLengthSuffix);
        }
     
        @Override
        public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
                synchronized (state.bucketStates) {
    -                   Iterator<Map.Entry<String, BucketState<T>>> it = 
state.bucketStates.entrySet().iterator();
    -                   while (it.hasNext()) {
    -                           BucketState<T> bucketState = 
it.next().getValue();
    +                   Iterator<Map.Entry<String, BucketState<T>>> stateIt = 
state.bucketStates.entrySet().iterator();
    +                   while (stateIt.hasNext()) {
    +                           BucketState<T> bucketState = 
stateIt.next().getValue();
                                synchronized 
(bucketState.pendingFilesPerCheckpoint) {
    -                                   Set<Long> pastCheckpointIds = 
bucketState.pendingFilesPerCheckpoint.keySet();
    -                                   Set<Long> checkpointsToRemove = new 
HashSet<>();
    -                                   for (Long pastCheckpointId : 
pastCheckpointIds) {
    +
    +                                   Iterator<Map.Entry<Long, List<String>>> 
pendingIt =
    +                                           
bucketState.pendingFilesPerCheckpoint.entrySet().iterator();
    +
    +                                   while (pendingIt.hasNext()) {
    +
    +                                           Map.Entry<Long, List<String>> 
entry = pendingIt.next();
    +                                           Long pastCheckpointId = 
entry.getKey();
    +                                           List<String> pendingPaths = 
entry.getValue();
    +
                                                if (pastCheckpointId <= 
checkpointId) {
                                                        LOG.debug("Moving 
pending files to final location for checkpoint {}", pastCheckpointId);
    -                                                   // All the pending 
files are buckets that have been completed but are waiting to be renamed
    -                                                   // to their final name
    -                                                   for (String filename : 
bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) {
    +
    +                                                   for (String filename : 
pendingPaths) {
                                                                Path finalPath 
= new Path(filename);
    -                                                           Path 
pendingPath = new Path(finalPath.getParent(),
    -                                                                   
pendingPrefix + finalPath.getName()).suffix(pendingSuffix);
    +                                                           Path 
pendingPath = getPendingPathFor(finalPath);
     
                                                                
fs.rename(pendingPath, finalPath);
                                                                LOG.debug(
                                                                        "Moving 
pending file {} to final location having completed checkpoint {}.",
                                                                        
pendingPath,
                                                                        
pastCheckpointId);
                                                        }
    -                                                   
checkpointsToRemove.add(pastCheckpointId);
    +                                                   pendingIt.remove();
                                                }
                                        }
    -                                   if (!bucketState.isWriterOpen && 
bucketState.pendingFiles.isEmpty()) {
    +
    +                                   if (!bucketState.isWriterOpen &&
    +                                           
bucketState.pendingFiles.isEmpty()) {
    +
                                                // We've dealt with all the 
pending files and the writer for this bucket is not currently open.
                                                // Therefore this bucket is 
currently inactive and we can remove it from our state.
    -                                           it.remove();
    -                                   } else {
    -                                           for (Long toRemove : 
checkpointsToRemove) {
    -                                                   
bucketState.pendingFilesPerCheckpoint.remove(toRemove);
    -                                           }
    +                                           stateIt.remove();
                                        }
                                }
                        }
                }
        }
     
        @Override
    -   public State<T> snapshotState(long checkpointId, long 
checkpointTimestamp) throws Exception {
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           Preconditions.checkNotNull(restoredBucketStates, "The operator 
has not been properly initialized.");
    +
    +           restoredBucketStates.clear();
    +
                synchronized (state.bucketStates) {
    -                   for (BucketState<T> bucketState : 
state.bucketStates.values()) {
    +                   int subtaskIdx = 
getRuntimeContext().getIndexOfThisSubtask();
    +                   
    +                   for (Map.Entry<String, BucketState<T>> bucketStateEntry 
: state.bucketStates.entrySet()) {
    +                           BucketState<T> bucketState = 
bucketStateEntry.getValue();
    +
                                if (bucketState.isWriterOpen) {
    -                                   long pos = bucketState.writer.flush();
    -                                   bucketState.currentFileValidLength = 
pos;
    +                                   bucketState.currentFileValidLength = 
bucketState.writer.flush();
                                }
    +
                                synchronized 
(bucketState.pendingFilesPerCheckpoint) {
    -                                   
bucketState.pendingFilesPerCheckpoint.put(checkpointId, 
bucketState.pendingFiles);
    +                                   
bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), 
bucketState.pendingFiles);
                                }
                                bucketState.pendingFiles = new ArrayList<>();
                        }
    +                   restoredBucketStates.add(state);
    +
    +                   if (LOG.isDebugEnabled()) {
    --- End diff --
    
    you don't have to check this.


> BucketingSink deletes valid data when checkpoint notification is slow.
> ----------------------------------------------------------------------
>
>                 Key: FLINK-5056
>                 URL: https://issues.apache.org/jira/browse/FLINK-5056
>             Project: Flink
>          Issue Type: Bug
>          Components: filesystem-connector
>    Affects Versions: 1.1.3
>            Reporter: Kostas Kloudas
>            Assignee: Kostas Kloudas
>             Fix For: 1.2.0
>
>
> Currently if BucketingSink receives no data after a checkpoint and then a 
> notification about a previous checkpoint arrives, it clears its state. This 
> can 
> lead to not committing valid data about intermediate checkpoints for whom
> a notification has not arrived yet. As a simple sequence that illustrates the 
> problem:
> -> input data 
> -> snapshot(0) 
> -> input data
> -> snapshot(1)
> -> no data
> -> notifyCheckpointComplete(0)
> the last will clear the state of the Sink without committing as final the 
> data 
> that arrived for checkpoint 1.
>  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to