[ https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=145651&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145651 ]
ASF GitHub Bot logged work on BEAM-3371: ---------------------------------------- Author: ASF GitHub Bot Created on: 19/Sep/18 13:13 Start Date: 19/Sep/18 13:13 Worklog Time Spent: 10m Work Description: mxm commented on a change in pull request #6244: [BEAM-3371] Enable running integration tests on Spark URL: https://github.com/apache/beam/pull/6244#discussion_r218792264 ########## File path: runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java ########## @@ -95,73 +83,33 @@ public void translate(Pipeline pipeline) { pipeline.replaceAll( FlinkTransformOverrides.getDefaultOverrides(translationMode == TranslationMode.STREAMING)); - // Local flink configurations work in the same JVM and have no problems with improperly - // formatted files on classpath (eg. directories with .class files or empty directories). - // prepareFilesToStage() only when using remote flink cluster. - List<String> filesToStage; - if (!options.getFlinkMaster().matches("\\[.*\\]")) { - filesToStage = prepareFilesToStage(); - } else { - filesToStage = options.getFilesToStage(); - } + prepareFilesToStageForRemoteClusterExecution(); FlinkPipelineTranslator translator; if (translationMode == TranslationMode.STREAMING) { this.flinkStreamEnv = - FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, filesToStage); + FlinkExecutionEnvironments.createStreamExecutionEnvironment( + options, options.getFilesToStage()); translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, options); } else { this.flinkBatchEnv = - FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, filesToStage); + FlinkExecutionEnvironments.createBatchExecutionEnvironment( + options, options.getFilesToStage()); translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options); } translator.translate(pipeline); } - private List<String> prepareFilesToStage() { - return options - .getFilesToStage() - .stream() - .map(File::new) - .filter(File::exists) - .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) : file.getAbsolutePath()) - .collect(Collectors.toList()); - } - - private String packageDirectoriesToStage(File directoryToStage) { - String hash = calculateDirectoryContentHash(directoryToStage); - String pathForJar = getUniqueJarPath(hash); - zipDirectory(directoryToStage, pathForJar); - return pathForJar; - } - - private String calculateDirectoryContentHash(File directoryToStage) { - Hasher hasher = Hashing.md5().newHasher(); - try (OutputStream hashStream = Funnels.asOutputStream(hasher)) { - ZipFiles.zipDirectory(directoryToStage, hashStream); - return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - private String getUniqueJarPath(String contentHash) { - String tempLocation = options.getTempLocation(); - - checkArgument( - !Strings.isNullOrEmpty(tempLocation), - "Please provide \"tempLocation\" pipeline option. Flink runner needs it to store jars " - + "made of directories that were on classpath."); - - return String.format("%s%s.jar", tempLocation, contentHash); - } - - private void zipDirectory(File directoryToStage, String uniqueDirectoryPath) { - try { - ZipFiles.zipDirectory(directoryToStage, new FileOutputStream(uniqueDirectoryPath)); - } catch (IOException e) { - throw new RuntimeException(e); + /** + * Local configurations work in the same JVM and have no problems with improperly formatted files + * on classpath (eg. directories with .class files or empty directories). Prepare files for + * staging only when using remote cluster. + */ + private void prepareFilesToStageForRemoteClusterExecution() { Review comment: Would make this static and pass the options explictly. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 145651) Time Spent: 3h 10m (was: 3h) > Add ability to stage directories with compiled classes to Spark > --------------------------------------------------------------- > > Key: BEAM-3371 > URL: https://issues.apache.org/jira/browse/BEAM-3371 > Project: Beam > Issue Type: Sub-task > Components: runner-spark > Reporter: Lukasz Gajowy > Assignee: Jean-Baptiste Onofré > Priority: Minor > Time Spent: 3h 10m > Remaining Estimate: 0h > > This one is basically the same issue as > [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], except > of two things: > - a detection of files to stage has to be provided in Spark, which is already > being developed [here|https://issues.apache.org/jira/browse/BEAM-981] > - the test execution is not interrupted by FileNotFoundException but by *the > effect* of the directory not being staged (absence of test classes on the > Spark's classpath, hence ClassNotFoundException). > Again, this probably could be resolved analogously as in flink, while > BEAM-981 issue is resolved. -- This message was sent by Atlassian JIRA (v7.6.3#76005)