[ https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667492#comment-15667492 ]
ASF GitHub Bot commented on FLINK-5056: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2797#discussion_r88043878 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception { /** * Gets the truncate() call using reflection. - * * <p> - * Note: This code comes from Flume + * <b>NOTE:</b> This code comes from Flume. */ private Method reflectTruncate(FileSystem fs) { - Method m = null; - if(fs != null) { - Class<?> fsClass = fs.getClass(); - try { - m = fsClass.getMethod("truncate", Path.class, long.class); - } catch (NoSuchMethodException ex) { - LOG.debug("Truncate not found. Will write a file with suffix '{}' " + - " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); - return null; - } + if (this.refTruncate == null) { + Method m = null; + if (fs != null) { + Class<?> fsClass = fs.getClass(); + try { + m = fsClass.getMethod("truncate", Path.class, long.class); + } catch (NoSuchMethodException ex) { + LOG.debug("Truncate not found. Will write a file with suffix '{}' " + + " and prefix '{}' to specify how many bytes in a bucket are valid.", + validLengthSuffix, validLengthPrefix); + return null; + } + + // verify that truncate actually works + FSDataOutputStream outputStream; + Path testPath = new Path(UUID.randomUUID().toString()); + try { + outputStream = fs.create(testPath); + outputStream.writeUTF("hello"); + outputStream.close(); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException("Could not create file for checking if truncate works.", e); + } + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } - // verify that truncate actually works - FSDataOutputStream outputStream; - Path testPath = new Path(UUID.randomUUID().toString()); - try { - outputStream = fs.create(testPath); - outputStream.writeUTF("hello"); - outputStream.close(); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file.", e); + } } + this.refTruncate = m; + } + return this.refTruncate; + } + private Path getPendingPathFor(Path path) { + return new Path(path.getParent(), pendingPrefix + path.getName()).suffix(pendingSuffix); + } - try { - m.invoke(fs, testPath, 2); - } catch (IllegalAccessException | InvocationTargetException e) { - LOG.debug("Truncate is not supported.", e); - m = null; - } + private Path getInProgressPathFor(Path path) { + return new Path(path.getParent(), inProgressPrefix + path.getName()).suffix(inProgressSuffix); + } - try { - fs.delete(testPath, false); - } catch (IOException e) { - LOG.error("Could not delete truncate test file.", e); - throw new RuntimeException("Could not delete truncate test file.", e); - } - } - return m; + private Path getValidLengthPathFor(Path path) { + return new Path(path.getParent(), validLengthPrefix + path.getName()).suffix(validLengthSuffix); } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { synchronized (state.bucketStates) { - Iterator<Map.Entry<String, BucketState<T>>> it = state.bucketStates.entrySet().iterator(); - while (it.hasNext()) { - BucketState<T> bucketState = it.next().getValue(); + Iterator<Map.Entry<String, BucketState<T>>> stateIt = state.bucketStates.entrySet().iterator(); + while (stateIt.hasNext()) { + BucketState<T> bucketState = stateIt.next().getValue(); synchronized (bucketState.pendingFilesPerCheckpoint) { - Set<Long> pastCheckpointIds = bucketState.pendingFilesPerCheckpoint.keySet(); - Set<Long> checkpointsToRemove = new HashSet<>(); - for (Long pastCheckpointId : pastCheckpointIds) { + + Iterator<Map.Entry<Long, List<String>>> pendingIt = + bucketState.pendingFilesPerCheckpoint.entrySet().iterator(); + + while (pendingIt.hasNext()) { + + Map.Entry<Long, List<String>> entry = pendingIt.next(); + Long pastCheckpointId = entry.getKey(); + List<String> pendingPaths = entry.getValue(); + if (pastCheckpointId <= checkpointId) { LOG.debug("Moving pending files to final location for checkpoint {}", pastCheckpointId); - // All the pending files are buckets that have been completed but are waiting to be renamed - // to their final name - for (String filename : bucketState.pendingFilesPerCheckpoint.get(pastCheckpointId)) { + + for (String filename : pendingPaths) { Path finalPath = new Path(filename); - Path pendingPath = new Path(finalPath.getParent(), - pendingPrefix + finalPath.getName()).suffix(pendingSuffix); + Path pendingPath = getPendingPathFor(finalPath); fs.rename(pendingPath, finalPath); LOG.debug( "Moving pending file {} to final location having completed checkpoint {}.", pendingPath, pastCheckpointId); } - checkpointsToRemove.add(pastCheckpointId); + pendingIt.remove(); } } - if (!bucketState.isWriterOpen && bucketState.pendingFiles.isEmpty()) { + + if (!bucketState.isWriterOpen && + bucketState.pendingFiles.isEmpty()) { + // We've dealt with all the pending files and the writer for this bucket is not currently open. // Therefore this bucket is currently inactive and we can remove it from our state. - it.remove(); - } else { - for (Long toRemove : checkpointsToRemove) { - bucketState.pendingFilesPerCheckpoint.remove(toRemove); - } + stateIt.remove(); } } } } } @Override - public State<T> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + public void snapshotState(FunctionSnapshotContext context) throws Exception { + Preconditions.checkNotNull(restoredBucketStates, "The operator has not been properly initialized."); + + restoredBucketStates.clear(); + synchronized (state.bucketStates) { - for (BucketState<T> bucketState : state.bucketStates.values()) { + int subtaskIdx = getRuntimeContext().getIndexOfThisSubtask(); + + for (Map.Entry<String, BucketState<T>> bucketStateEntry : state.bucketStates.entrySet()) { + BucketState<T> bucketState = bucketStateEntry.getValue(); + if (bucketState.isWriterOpen) { - long pos = bucketState.writer.flush(); - bucketState.currentFileValidLength = pos; + bucketState.currentFileValidLength = bucketState.writer.flush(); } + synchronized (bucketState.pendingFilesPerCheckpoint) { - bucketState.pendingFilesPerCheckpoint.put(checkpointId, bucketState.pendingFiles); + bucketState.pendingFilesPerCheckpoint.put(context.getCheckpointId(), bucketState.pendingFiles); } bucketState.pendingFiles = new ArrayList<>(); } + restoredBucketStates.add(state); + + if (LOG.isDebugEnabled()) { --- End diff -- you don't have to check this. > BucketingSink deletes valid data when checkpoint notification is slow. > ---------------------------------------------------------------------- > > Key: FLINK-5056 > URL: https://issues.apache.org/jira/browse/FLINK-5056 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.1.3 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently if BucketingSink receives no data after a checkpoint and then a > notification about a previous checkpoint arrives, it clears its state. This > can > lead to not committing valid data about intermediate checkpoints for whom > a notification has not arrived yet. As a simple sequence that illustrates the > problem: > -> input data > -> snapshot(0) > -> input data > -> snapshot(1) > -> no data > -> notifyCheckpointComplete(0) > the last will clear the state of the Sink without committing as final the > data > that arrived for checkpoint 1. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)