writing to local files on a worker
I have a problem where a critical step needs to be performed by a third party c++ application. I can send or install this program on the worker nodes. I can construct a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as a local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system. My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: writing to local files on a worker
Hello, You could try using mapPartitions function if you can send partial data to your C++ program: mapPartitions(func): Similar to map, but runs separately on each partition (block) of the RDD, so /func/ must be of type Iterator => Iterator when running on an RDD of type T. That way you can write partition data to temp file, call your C++ app, then delete the temp file. Of course your data would be limited to all rows in one partition. Also the latest release of Spark (2.4.0) introduced barrier execution mode: https://issues.apache.org/jira/browse/SPARK-24374 Maybe you could combine the two, just using mapPartitions will give you single partition data only, and your app call will be repeated on all nodes, not necessarily at the same time. Spark's strong point is parallel execution, so what you're trying to do kind of defeats that. But if you do not need to combine all the data before calling your app then you could do it. Or you could split your job into Spark -> app -> Spark chain. Good luck, Joe On 11/11/2018 02:13 PM, Steve Lewis wrote: I have a problem where a critical step needs to be performed by a third party c++ application. I can send or install this program on the worker nodes. I can construct a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as a local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system. My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: writing to local files on a worker
Can you use JNI to call the c++ functionality directly from Java? Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it allows you to use shell scripts as mapper and reducer)? You can also write temporary files for each partition and execute the software within a map step. Generally you should not call external applications from Spark. > Am 11.11.2018 um 23:13 schrieb Steve Lewis : > > I have a problem where a critical step needs to be performed by a third > party c++ application. I can send or install this program on the worker > nodes. I can construct a function holding all the data this program needs to > process. The problem is that the program is designed to read and write from > the local file system. I can call the program from Java and read its output > as a local file - then deleting all temporary files but I doubt that it is > possible to get the program to read from hdfs or any shared file system. > My question is can a function running on a worker node create temporary files > and pass the names of these to a local process assuming everything is cleaned > up after the call? > > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: writing to local files on a worker
I have been looking at Spark-Blast which calls Blast - a well known C++ program in parallel - In my case I have tried to translate the C++ code to Java but am not getting the same results - it is convoluted - I have code that will call the program and read its results - the only real issue is the program wants local files - their use is convoluted with many seeks so replacement with streaming will not work - as long as my Java code can write to a local file for the duration of one call things can work - I considered in memory files as long as they can be passed to another program - I am willing to have OS specific code So my issue is I need to write 3 files - run a program and read one output file - then all files can be deleted - JNI calls will be hard - this is s program not a library and it is available for worker nodes On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke wrote: > Can you use JNI to call the c++ functionality directly from Java? > > Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it > allows you to use shell scripts as mapper and reducer)? > > You can also write temporary files for each partition and execute the > software within a map step. > > Generally you should not call external applications from Spark. > > > Am 11.11.2018 um 23:13 schrieb Steve Lewis : > > > > I have a problem where a critical step needs to be performed by a third > party c++ application. I can send or install this program on the worker > nodes. I can construct a function holding all the data this program needs > to process. The problem is that the program is designed to read and write > from the local file system. I can call the program from Java and read its > output as a local file - then deleting all temporary files but I doubt > that it is possible to get the program to read from hdfs or any shared file > system. > > My question is can a function running on a worker node create temporary > files and pass the names of these to a local process assuming everything is > cleaned up after the call? > > > > -- > > Steven M. Lewis PhD > > 4221 105th Ave NE > > Kirkland, WA 98033 > > 206-384-1340 (cell) > > Skype lordjoe_com > > > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: writing to local files on a worker
I looked at Java's mechanism for creating temporary local files. I believe they can be created, written to and passed to other programs on the system. I wrote a proof of concept to send some Strings out and use the local program cat to concatenate them and write the result to a local file . Clearly there is a more complex program I want to target but is there anything wrong with this approach == package com.lordjoe.comet; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import scala.Option; import scala.Tuple2; import java.io.*; import java.util.*; /** * com.lordjoe.comet.SparkCatTest * Tests using Java temp files in a function call in */ public class SparkCatTest { public static final int NUMBER_REPEATS = 10; // make NUMBER_REPEATS * NUMBER_REPEATS paris public static List buildItems(String text, int repeats) { Listret = new ArrayList<>() ; for (int i = 0; i < repeats; i++) { ret.add(text + i); } return ret; } public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("CatWithFiles"); Option option = sparkConf.getOption("spark.master"); if (!option.isDefined()) { // use local over nothing sparkConf.setMaster("local[*]"); } JavaSparkContext ctx = new JavaSparkContext(sparkConf); List start = buildItems("Start ",NUMBER_REPEATS ) ; // make some data like Start 9 List end = buildItems("End ",NUMBER_REPEATS ) ;// make some data like End 9 JavaRDD startRdd = ctx.parallelize(start); JavaRDD endRdd = ctx.parallelize(end); JavaPairRDD cross = startRdd.cartesian(endRdd); // make all pairs /** * dirty work is done here and used files to perform cat */ JavaRDD map = cross.map(new Function, String>() { @Override public String call(Tuple2 x) throws Exception { File f1 = makeTempFile( ); writeFile(f1, x._1); File f2 = makeTempFile( ) ; writeFile(f2, x._2); File f3 = makeTempFile( ); boolean success = false; String ret = null; String f1path = f1.getAbsolutePath(); String f2path = f2.getAbsolutePath(); String f3Path = f3.getAbsolutePath(); String command = "cat " + f1path + " " + f2path + " > " + f3Path; if(osIsWindows()) success = executeCommandLine("cmd","/c",command); else success = executeCommandLine("/bin/sh","-c",command); if(success) { ret = readFile(f3); } f1.delete(); f2.delete(); f3.delete(); return ret; } }); // note the list returned by collect is immutable so we need a copy List collect = new ArrayList(map.collect()); Collections.sort(collect); for (String s : collect) { System.out.println(s); } } /** * true if running on Windows - otherwise Linux assumed * @return */ public static synchronized boolean osIsWindows() { String osName = System.getProperty("os.name").toLowerCase(); return (osName.indexOf("windows") != -1); } /** * make a temporary file wiht a unique name and delete on exit * @return non-null file */ public static File makeTempFile( ) throws IOException { String prefix = UUID.randomUUID().toString(); // unique name String suffix = ".txt"; File tempFile2 = File.createTempFile(prefix, suffix); tempFile2.deleteOnExit(); // drop on shutdown return tempFile2; } public static boolean executeCommandLine(String... args) throws IOException, InterruptedException { ProcessBuilder p = new ProcessBuilder(args); Process process = p.start(); int result = process.waitFor(); int returnVal = process.exitValue(); return returnVal == 0; } /** * @name writeFile * @param FileName name of file to create * @param data date to write * @function write the string data to the file Filename */ public static boolean writeFile(File f, String data) throws IOException { PrintWriter out = new PrintWriter(new FileWriter(f)); if (out != null) { out.print(data); out.close(); return (true); } return (false); // failure } /**