I looked at Java's mechanism for creating temporary local files. I believe they can be created, written to and passed to other programs on the system. I wrote a proof of concept to send some Strings out and use the local program cat to concatenate them and write the result to a local file . Clearly there is a more complex program I want to target but is there anything wrong with this approach
========================================== package com.lordjoe.comet; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import scala.Option; import scala.Tuple2; import java.io.*; import java.util.*; /** * com.lordjoe.comet.SparkCatTest * Tests using Java temp files in a function call in */ public class SparkCatTest { public static final int NUMBER_REPEATS = 10; // make NUMBER_REPEATS * NUMBER_REPEATS paris public static List<String> buildItems(String text, int repeats) { List<String>ret = new ArrayList<>() ; for (int i = 0; i < repeats; i++) { ret.add(text + i); } return ret; } public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("CatWithFiles"); Option<String> option = sparkConf.getOption("spark.master"); if (!option.isDefined()) { // use local over nothing sparkConf.setMaster("local[*]"); } JavaSparkContext ctx = new JavaSparkContext(sparkConf); List<String> start = buildItems("Start ",NUMBER_REPEATS ) ; // make some data like Start 9 List<String> end = buildItems("End ",NUMBER_REPEATS ) ; // make some data like End 9 JavaRDD<String> startRdd = ctx.parallelize(start); JavaRDD<String> endRdd = ctx.parallelize(end); JavaPairRDD<String, String> cross = startRdd.cartesian(endRdd); // make all pairs /** * dirty work is done here and used files to perform cat */ JavaRDD<String> map = cross.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> x) throws Exception { File f1 = makeTempFile( ); writeFile(f1, x._1); File f2 = makeTempFile( ) ; writeFile(f2, x._2); File f3 = makeTempFile( ); boolean success = false; String ret = null; String f1path = f1.getAbsolutePath(); String f2path = f2.getAbsolutePath(); String f3Path = f3.getAbsolutePath(); String command = "cat " + f1path + " " + f2path + " > " + f3Path; if(osIsWindows()) success = executeCommandLine("cmd","/c",command); else success = executeCommandLine("/bin/sh","-c",command); if(success) { ret = readFile(f3); } f1.delete(); f2.delete(); f3.delete(); return ret; } }); // note the list returned by collect is immutable so we need a copy List<String> collect = new ArrayList(map.collect()); Collections.sort(collect); for (String s : collect) { System.out.println(s); } } /** * true if running on Windows - otherwise Linux assumed * @return */ public static synchronized boolean osIsWindows() { String osName = System.getProperty("os.name").toLowerCase(); return (osName.indexOf("windows") != -1); } /** * make a temporary file wiht a unique name and delete on exit * @return non-null file */ public static File makeTempFile( ) throws IOException { String prefix = UUID.randomUUID().toString(); // unique name String suffix = ".txt"; File tempFile2 = File.createTempFile(prefix, suffix); tempFile2.deleteOnExit(); // drop on shutdown return tempFile2; } public static boolean executeCommandLine(String... args) throws IOException, InterruptedException { ProcessBuilder p = new ProcessBuilder(args); Process process = p.start(); int result = process.waitFor(); int returnVal = process.exitValue(); return returnVal == 0; } /** * @name writeFile * @param FileName name of file to create * @param data date to write * @function write the string data to the file Filename */ public static boolean writeFile(File f, String data) throws IOException { PrintWriter out = new PrintWriter(new FileWriter(f)); if (out != null) { out.print(data); out.close(); return (true); } return (false); // failure } /** * @name readFile * @function write the string data to the file Filename * @param FileName name of file to read * @return contents of a text file */ public static String readFile(File f ) throws IOException{ LineNumberReader rdr = new LineNumberReader(new FileReader(f)); StringBuilder sb = new StringBuilder(); String line = rdr.readLine(); while(line != null) { sb.append(line); sb.append("\n"); line = rdr.readLine(); } rdr.close(); return sb.toString(); // failure } } On Mon, Nov 12, 2018 at 9:20 AM Steve Lewis <lordjoe2...@gmail.com> wrote: > I have been looking at Spark-Blast which calls Blast - a well known C++ > program in parallel - > In my case I have tried to translate the C++ code to Java but am not > getting the same results - it is convoluted - > I have code that will call the program and read its results - the only > real issue is the program wants local files - > their use is convoluted with many seeks so replacement with streaming will > not work - > as long as my Java code can write to a local file for the duration of one > call things can work - > > I considered in memory files as long as they can be passed to another > program - I am willing to have OS specific code > So my issue is I need to write 3 files - run a program and read one output > file - then all files can be deleted - > JNI calls will be hard - this is s program not a library and it is > available for worker nodes > > On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke <jornfra...@gmail.com> wrote: > >> Can you use JNI to call the c++ functionality directly from Java? >> >> Or you wrap this into a MR step outside Spark and use Hadoop Streaming >> (it allows you to use shell scripts as mapper and reducer)? >> >> You can also write temporary files for each partition and execute the >> software within a map step. >> >> Generally you should not call external applications from Spark. >> >> > Am 11.11.2018 um 23:13 schrieb Steve Lewis <lordjoe2...@gmail.com>: >> > >> > I have a problem where a critical step needs to be performed by a >> third party c++ application. I can send or install this program on the >> worker nodes. I can construct a function holding all the data this program >> needs to process. The problem is that the program is designed to read and >> write from the local file system. I can call the program from Java and read >> its output as a local file - then deleting all temporary files but I >> doubt that it is possible to get the program to read from hdfs or any >> shared file system. >> > My question is can a function running on a worker node create temporary >> files and pass the names of these to a local process assuming everything is >> cleaned up after the call? >> > >> > -- >> > Steven M. Lewis PhD >> > 4221 105th Ave NE >> > Kirkland, WA 98033 >> > 206-384-1340 (cell) >> > Skype lordjoe_com >> > >> > > > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com