[ 
https://issues.apache.org/jira/browse/BEAM-3371?focusedWorklogId=145651&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-145651
 ]

ASF GitHub Bot logged work on BEAM-3371:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Sep/18 13:13
            Start Date: 19/Sep/18 13:13
    Worklog Time Spent: 10m 
      Work Description: mxm commented on a change in pull request #6244: 
[BEAM-3371] Enable running integration tests on Spark
URL: https://github.com/apache/beam/pull/6244#discussion_r218792264
 
 

 ##########
 File path: 
runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java
 ##########
 @@ -95,73 +83,33 @@ public void translate(Pipeline pipeline) {
     pipeline.replaceAll(
         FlinkTransformOverrides.getDefaultOverrides(translationMode == 
TranslationMode.STREAMING));
 
-    // Local flink configurations work in the same JVM and have no problems 
with improperly
-    // formatted files on classpath (eg. directories with .class files or 
empty directories).
-    // prepareFilesToStage() only when using remote flink cluster.
-    List<String> filesToStage;
-    if (!options.getFlinkMaster().matches("\\[.*\\]")) {
-      filesToStage = prepareFilesToStage();
-    } else {
-      filesToStage = options.getFilesToStage();
-    }
+    prepareFilesToStageForRemoteClusterExecution();
 
     FlinkPipelineTranslator translator;
     if (translationMode == TranslationMode.STREAMING) {
       this.flinkStreamEnv =
-          FlinkExecutionEnvironments.createStreamExecutionEnvironment(options, 
filesToStage);
+          FlinkExecutionEnvironments.createStreamExecutionEnvironment(
+              options, options.getFilesToStage());
       translator = new FlinkStreamingPipelineTranslator(flinkStreamEnv, 
options);
     } else {
       this.flinkBatchEnv =
-          FlinkExecutionEnvironments.createBatchExecutionEnvironment(options, 
filesToStage);
+          FlinkExecutionEnvironments.createBatchExecutionEnvironment(
+              options, options.getFilesToStage());
       translator = new FlinkBatchPipelineTranslator(flinkBatchEnv, options);
     }
 
     translator.translate(pipeline);
   }
 
-  private List<String> prepareFilesToStage() {
-    return options
-        .getFilesToStage()
-        .stream()
-        .map(File::new)
-        .filter(File::exists)
-        .map(file -> file.isDirectory() ? packageDirectoriesToStage(file) : 
file.getAbsolutePath())
-        .collect(Collectors.toList());
-  }
-
-  private String packageDirectoriesToStage(File directoryToStage) {
-    String hash = calculateDirectoryContentHash(directoryToStage);
-    String pathForJar = getUniqueJarPath(hash);
-    zipDirectory(directoryToStage, pathForJar);
-    return pathForJar;
-  }
-
-  private String calculateDirectoryContentHash(File directoryToStage) {
-    Hasher hasher = Hashing.md5().newHasher();
-    try (OutputStream hashStream = Funnels.asOutputStream(hasher)) {
-      ZipFiles.zipDirectory(directoryToStage, hashStream);
-      return Base64Variants.MODIFIED_FOR_URL.encode(hasher.hash().asBytes());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  private String getUniqueJarPath(String contentHash) {
-    String tempLocation = options.getTempLocation();
-
-    checkArgument(
-        !Strings.isNullOrEmpty(tempLocation),
-        "Please provide \"tempLocation\" pipeline option. Flink runner needs 
it to store jars "
-            + "made of directories that were on classpath.");
-
-    return String.format("%s%s.jar", tempLocation, contentHash);
-  }
-
-  private void zipDirectory(File directoryToStage, String uniqueDirectoryPath) 
{
-    try {
-      ZipFiles.zipDirectory(directoryToStage, new 
FileOutputStream(uniqueDirectoryPath));
-    } catch (IOException e) {
-      throw new RuntimeException(e);
+  /**
+   * Local configurations work in the same JVM and have no problems with 
improperly formatted files
+   * on classpath (eg. directories with .class files or empty directories). 
Prepare files for
+   * staging only when using remote cluster.
+   */
+  private void prepareFilesToStageForRemoteClusterExecution() {
 
 Review comment:
   Would make this static and pass the options explictly.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 145651)
    Time Spent: 3h 10m  (was: 3h)

> Add ability to stage directories with compiled classes to Spark
> ---------------------------------------------------------------
>
>                 Key: BEAM-3371
>                 URL: https://issues.apache.org/jira/browse/BEAM-3371
>             Project: Beam
>          Issue Type: Sub-task
>          Components: runner-spark
>            Reporter: Lukasz Gajowy
>            Assignee: Jean-Baptiste Onofré
>            Priority: Minor
>          Time Spent: 3h 10m
>  Remaining Estimate: 0h
>
> This one is basically the same issue as
>  [this Flink's one|https://issues.apache.org/jira/browse/BEAM-3370], except 
> of two things:
> - a detection of files to stage has to be provided in Spark, which is already 
> being developed [here|https://issues.apache.org/jira/browse/BEAM-981]
> - the test execution is not interrupted by FileNotFoundException but by *the 
> effect* of the directory not being staged (absence of test classes on the 
> Spark's classpath, hence ClassNotFoundException).
> Again, this probably could be resolved analogously as in flink, while 
> BEAM-981 issue is resolved. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to