Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/2797#discussion_r88208586
--- Diff:
flink-streaming-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
---
@@ -570,139 +563,165 @@ private void closeCurrentPartFile(BucketState<T>
bucketState) throws Exception {
/**
* Gets the truncate() call using reflection.
- *
* <p>
- * Note: This code comes from Flume
+ * <b>NOTE:</b> This code comes from Flume.
*/
private Method reflectTruncate(FileSystem fs) {
- Method m = null;
- if(fs != null) {
- Class<?> fsClass = fs.getClass();
- try {
- m = fsClass.getMethod("truncate", Path.class,
long.class);
- } catch (NoSuchMethodException ex) {
- LOG.debug("Truncate not found. Will write a
file with suffix '{}' " +
- " and prefix '{}' to specify how many
bytes in a bucket are valid.", validLengthSuffix, validLengthPrefix);
- return null;
- }
+ if (this.refTruncate == null) {
+ Method m = null;
+ if (fs != null) {
+ Class<?> fsClass = fs.getClass();
+ try {
+ m = fsClass.getMethod("truncate",
Path.class, long.class);
+ } catch (NoSuchMethodException ex) {
+ LOG.debug("Truncate not found. Will
write a file with suffix '{}' " +
+ " and prefix '{}' to specify
how many bytes in a bucket are valid.",
+ validLengthSuffix,
validLengthPrefix);
+ return null;
+ }
+ // verify that truncate actually works
+ FSDataOutputStream outputStream;
+ Path testPath = new
Path(UUID.randomUUID().toString());
+ try {
+ outputStream = fs.create(testPath);
+ outputStream.writeUTF("hello");
+ outputStream.close();
+ } catch (IOException e) {
+ LOG.error("Could not create file for
checking if truncate works.", e);
+ throw new RuntimeException("Could not
create file for checking if truncate works.", e);
+ }
- // verify that truncate actually works
- FSDataOutputStream outputStream;
- Path testPath = new Path(UUID.randomUUID().toString());
- try {
- outputStream = fs.create(testPath);
- outputStream.writeUTF("hello");
- outputStream.close();
- } catch (IOException e) {
- LOG.error("Could not create file for checking
if truncate works.", e);
- throw new RuntimeException("Could not create
file for checking if truncate works.", e);
+ try {
+ m.invoke(fs, testPath, 2);
+ } catch (IllegalAccessException |
InvocationTargetException e) {
+ LOG.debug("Truncate is not supported.",
e);
+ m = null;
+ }
+
+ try {
+ fs.delete(testPath, false);
+ } catch (IOException e) {
+ LOG.error("Could not delete truncate
test file.", e);
+ throw new RuntimeException("Could not
delete truncate test file.", e);
+ }
}
+ this.refTruncate = m;
--- End diff --
I'm not so sure about this assignment; it seems very odd to both assign it
to a field and return it. The returned value will typically be assigned to the
very same field. The cotnract should be cleaned up to either return the value
or assign it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---