[ https://issues.apache.org/jira/browse/FLINK-5056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15667672#comment-15667672 ]
ASF GitHub Bot commented on FLINK-5056: --------------------------------------- Github user kl0u commented on a diff in the pull request: https://github.com/apache/flink/pull/2797#discussion_r88063735 --- Diff: flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -570,284 +616,277 @@ private void closeCurrentPartFile(BucketState<T> bucketState) throws Exception { /** * Gets the truncate() call using reflection. - * * <p> - * Note: This code comes from Flume + * <b>NOTE:</b> This code comes from Flume. */ private Method reflectTruncate(FileSystem fs) { - Method m = null; - if(fs != null) { - Class<?> fsClass = fs.getClass(); - try { - m = fsClass.getMethod("truncate", Path.class, long.class); - } catch (NoSuchMethodException ex) { - LOG.debug("Truncate not found. Will write a file with suffix '{}' " + - " and prefix '{}' to specify how many bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix); - return null; - } + if (this.refTruncate == null) { + Method m = null; + if (fs != null) { + Class<?> fsClass = fs.getClass(); + try { + m = fsClass.getMethod("truncate", Path.class, long.class); + } catch (NoSuchMethodException ex) { + LOG.debug("Truncate not found. Will write a file with suffix '{}' " + + " and prefix '{}' to specify how many bytes in a bucket are valid.", + validLengthSuffix, validLengthPrefix); + return null; + } + + // verify that truncate actually works + FSDataOutputStream outputStream; + Path testPath = new Path(UUID.randomUUID().toString()); + try { + outputStream = fs.create(testPath); + outputStream.writeUTF("hello"); + outputStream.close(); + } catch (IOException e) { + LOG.error("Could not create file for checking if truncate works.", e); + throw new RuntimeException("Could not create file for checking if truncate works.", e); + } + try { + m.invoke(fs, testPath, 2); + } catch (IllegalAccessException | InvocationTargetException e) { + LOG.debug("Truncate is not supported.", e); + m = null; + } - // verify that truncate actually works - FSDataOutputStream outputStream; - Path testPath = new Path(UUID.randomUUID().toString()); - try { - outputStream = fs.create(testPath); - outputStream.writeUTF("hello"); - outputStream.close(); - } catch (IOException e) { - LOG.error("Could not create file for checking if truncate works.", e); - throw new RuntimeException("Could not create file for checking if truncate works.", e); + try { + fs.delete(testPath, false); + } catch (IOException e) { + LOG.error("Could not delete truncate test file.", e); + throw new RuntimeException("Could not delete truncate test file.", e); + } } + this.refTruncate = m; + } + return this.refTruncate; + } + private Path getPendingPathFor(Path path) { --- End diff -- what will this save? > BucketingSink deletes valid data when checkpoint notification is slow. > ---------------------------------------------------------------------- > > Key: FLINK-5056 > URL: https://issues.apache.org/jira/browse/FLINK-5056 > Project: Flink > Issue Type: Bug > Components: filesystem-connector > Affects Versions: 1.1.3 > Reporter: Kostas Kloudas > Assignee: Kostas Kloudas > Fix For: 1.2.0 > > > Currently if BucketingSink receives no data after a checkpoint and then a > notification about a previous checkpoint arrives, it clears its state. This > can > lead to not committing valid data about intermediate checkpoints for whom > a notification has not arrived yet. As a simple sequence that illustrates the > problem: > -> input data > -> snapshot(0) > -> input data > -> snapshot(1) > -> no data > -> notifyCheckpointComplete(0) > the last will clear the state of the Sink without committing as final the > data > that arrived for checkpoint 1. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)