liufangqi commented on a change in pull request #17958: URL: https://github.com/apache/flink/pull/17958#discussion_r761672254
########## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java ########## @@ -247,4 +248,43 @@ private static boolean addHadoopConfIfFound( } return foundHadoopConfiguration; } + + /** + * Set up the caller context [[callerContext]] by invoking Hadoop CallerContext API of + * [[org.apache.hadoop.ipc.CallerContext]], which was added in hadoop 2.8. + */ + public static void setCallerContext(String callerContext, + org.apache.flink.configuration.Configuration flinkConfig) { + if (isMinHadoopVersion(2,8)) { + try { + callerContext = truncateCallerContext(callerContext, flinkConfig); + Class callerContextClass = Class.forName("org.apache.hadoop.ipc.CallerContext"); + Class builder = Class.forName("org.apache.hadoop.ipc.CallerContext$Builder"); + Constructor builderInst = builder.getConstructor(callerContext.getClass()); + callerContextClass.getMethod("setCurrent", callerContextClass) + .invoke(null, builder.getMethod("build") + .invoke(builderInst.newInstance(callerContext))); Review comment: I know it is a graceful way to do this feature. But as you see, that's probably a big effort for this feature. Above all, I think reflection is a transitional way to do this work. And I can do the Hadoop version bumping after this feature, it may take some time, but i think i can take this ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org