Re: writing to local files on a worker
f)); if (out != null) { out.print(data); out.close(); return (true); } return (false); // failure } /** * @name readFile * @function write the string data to the file Filename * @param FileName name of file to read * @return contents of a text file */ public static String readFile(File f ) throws IOException{ LineNumberReader rdr = new LineNumberReader(new FileReader(f)); StringBuilder sb = new StringBuilder(); String line = rdr.readLine(); while(line != null) { sb.append(line); sb.append("\n"); line = rdr.readLine(); } rdr.close(); return sb.toString(); // failure } } On Mon, Nov 12, 2018 at 9:20 AM Steve Lewis wrote: > I have been looking at Spark-Blast which calls Blast - a well known C++ > program in parallel - > In my case I have tried to translate the C++ code to Java but am not > getting the same results - it is convoluted - > I have code that will call the program and read its results - the only > real issue is the program wants local files - > their use is convoluted with many seeks so replacement with streaming will > not work - > as long as my Java code can write to a local file for the duration of one > call things can work - > > I considered in memory files as long as they can be passed to another > program - I am willing to have OS specific code > So my issue is I need to write 3 files - run a program and read one output > file - then all files can be deleted - > JNI calls will be hard - this is s program not a library and it is > available for worker nodes > > On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke wrote: > >> Can you use JNI to call the c++ functionality directly from Java? >> >> Or you wrap this into a MR step outside Spark and use Hadoop Streaming >> (it allows you to use shell scripts as mapper and reducer)? >> >> You can also write temporary files for each partition and execute the >> software within a map step. >> >> Generally you should not call external applications from Spark. >> >> > Am 11.11.2018 um 23:13 schrieb Steve Lewis : >> > >> > I have a problem where a critical step needs to be performed by a >> third party c++ application. I can send or install this program on the >> worker nodes. I can construct a function holding all the data this program >> needs to process. The problem is that the program is designed to read and >> write from the local file system. I can call the program from Java and read >> its output as a local file - then deleting all temporary files but I >> doubt that it is possible to get the program to read from hdfs or any >> shared file system. >> > My question is can a function running on a worker node create temporary >> files and pass the names of these to a local process assuming everything is >> cleaned up after the call? >> > >> > -- >> > Steven M. Lewis PhD >> > 4221 105th Ave NE >> > Kirkland, WA 98033 >> > 206-384-1340 (cell) >> > Skype lordjoe_com >> > >> > > > -- > Steven M. Lewis PhD > 4221 105th Ave NE > Kirkland, WA 98033 > 206-384-1340 (cell) > Skype lordjoe_com > > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: writing to local files on a worker
I have been looking at Spark-Blast which calls Blast - a well known C++ program in parallel - In my case I have tried to translate the C++ code to Java but am not getting the same results - it is convoluted - I have code that will call the program and read its results - the only real issue is the program wants local files - their use is convoluted with many seeks so replacement with streaming will not work - as long as my Java code can write to a local file for the duration of one call things can work - I considered in memory files as long as they can be passed to another program - I am willing to have OS specific code So my issue is I need to write 3 files - run a program and read one output file - then all files can be deleted - JNI calls will be hard - this is s program not a library and it is available for worker nodes On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke wrote: > Can you use JNI to call the c++ functionality directly from Java? > > Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it > allows you to use shell scripts as mapper and reducer)? > > You can also write temporary files for each partition and execute the > software within a map step. > > Generally you should not call external applications from Spark. > > > Am 11.11.2018 um 23:13 schrieb Steve Lewis : > > > > I have a problem where a critical step needs to be performed by a third > party c++ application. I can send or install this program on the worker > nodes. I can construct a function holding all the data this program needs > to process. The problem is that the program is designed to read and write > from the local file system. I can call the program from Java and read its > output as a local file - then deleting all temporary files but I doubt > that it is possible to get the program to read from hdfs or any shared file > system. > > My question is can a function running on a worker node create temporary > files and pass the names of these to a local process assuming everything is > cleaned up after the call? > > > > -- > > Steven M. Lewis PhD > > 4221 105th Ave NE > > Kirkland, WA 98033 > > 206-384-1340 (cell) > > Skype lordjoe_com > > > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
writing to local files on a worker
I have a problem where a critical step needs to be performed by a third party c++ application. I can send or install this program on the worker nodes. I can construct a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as a local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system. My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
No space left on device
We are trying to run a job that has previously run on Spark 1.3 on a different cluster. The job was converted to 2.3 spark and this is a new cluster. The job dies after completing about a half dozen stages with java.io.IOException: No space left on device It appears that the nodes are using local storage as tmp. I could use help diagnosing the issue and how to fix it. Here are the spark conf properties Spark Conf Properties spark.driver.extraJavaOptions=-Djava.io.tmpdir=/scratch/home/int/eva/zorzan/sparktmp/ spark.master=spark://10.141.0.34:7077 spark.mesos.executor.memoryOverhead=3128 spark.shuffle.consolidateFiles=true spark.shuffle.spill=falsespark.app.name=Anonymous spark.shuffle.manager=sort spark.storage.memoryFraction=0.3 spark.jars=file:/home/int/eva/zorzan/bin/SparkHydraV2-master/HydraSparkBuilt.jar spark.ui.killEnabled=true spark.shuffle.spill.compress=true spark.shuffle.sort.bypassMergeThreshold=100 com.lordjoe.distributed.marker_property=spark_property_set spark.executor.memory=12g spark.mesos.coarse=true spark.shuffle.memoryFraction=0.4 spark.serializer=org.apache.spark.serializer.KryoSerializer spark.kryo.registrator=com.lordjoe.distributed.hydra.HydraKryoSerializer spark.default.parallelism=360 spark.io.compression.codec=lz4 spark.reducer.maxMbInFlight=128 spark.hadoop.validateOutputSpecs=false spark.submit.deployMode=client spark.local.dir=/scratch/home/int/eva/zorzan/sparktmp spark.shuffle.file.buffer.kb=1024 -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How do i get a spark instance to use my log4j properties
Ok I am stymied. I have tried everything I can think of to get spark to use my own version of log4j.properties In the launcher code - I launch a local instance from a Java application I say -Dlog4j.configuration=conf/log4j.properties where conf/log4j.properties is user.dir - no luck Spark always starts saying Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties I have a directory conf with my log4j.properties there but it seems to be ignored I use maven and an VERY RELUCTANT to edit the spark jars I know this point has been discussed here before but I do not see a clean answer
I need some help making datasets with known columns from a JavaBean
I asked a similar question a day or so ago but this is a much more concrete example showing the difficulty I am running into I am trying to use DataSets. I have an object which I want to encode with its fields as columns. The object is a well behaved Java Bean. However one field is an object (or a collection of objects) which are not beans. My simple code case is like this. What I want is a DataSet of MyBeans with columns count,name and unBean /** * This class is a good Java bean but one field holds an object * which is not a bean */ public class MyBean implements Serializable { private int m_count; private String m_Name; private MyUnBean m_UnBean; public MyBean(int count, String name, MyUnBean unBean) { m_count = count; m_Name = name; m_UnBean = unBean; } public int getCount() {return m_count; } public void setCount(int count) {m_count = count;} public String getName() {return m_Name;} public void setName(String name) {m_Name = name;} public MyUnBean getUnBean() {return m_UnBean;} public void setUnBean(MyUnBean unBean) {m_UnBean = unBean;} } /** * This is a Java object which is not a bean * no getters or setters but is serializable */ public class MyUnBean implements Serializable { public final int count; public final String name; public MyUnBean(int count, String name) { this.count = count; this.name = name; } } ** * This code creates a list of objects containing MyBean - * a Java Bean containing one field which is not bean * It then attempts and fails to use a bean encoder * to make a DataSet */ public class DatasetTest { public static final Random RND = new Random(); public static final int LIST_SIZE = 100; public static String makeName() { return Integer.toString(RND.nextInt()); } public static MyUnBean makeUnBean() { return new MyUnBean(RND.nextInt(), makeName()); } public static MyBean makeBean() { return new MyBean(RND.nextInt(), makeName(), makeUnBean()); } /** * Make a list of MyBeans * @return */ public static List makeBeanList() { List holder = new ArrayList(); for (int i = 0; i < LIST_SIZE; i++) { holder.add(makeBean()); } return holder; } public static SQLContext getSqlContext() { SparkConf sparkConf = new SparkConf(); sparkConf.setAppName("BeanTest") ; Option option = sparkConf.getOption("spark.master"); if (!option.isDefined())// use local over nothing sparkConf.setMaster("local[*]"); JavaSparkContext ctx = new JavaSparkContext(sparkConf) ; return new SQLContext(ctx); } public static void main(String[] args) { SQLContext sqlContext = getSqlContext(); Encoder evidence = Encoders.bean(MyBean.class); Encoder evidence2 = Encoders.javaSerialization(MyUnBean.class); List holder = makeBeanList(); Dataset beanSet = sqlContext.createDataset( holder, evidence); long count = beanSet.count(); if(count != LIST_SIZE) throw new IllegalStateException("bad count"); } } This is the last seacion of the log showing the errors I get 16/03/01 09:21:31 INFO SparkUI: Started SparkUI at http://169.254.87.23:4040 16/03/01 09:21:31 INFO Executor: Starting executor ID driver on host localhost 16/03/01 09:21:31 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 61922. 16/03/01 09:21:31 INFO NettyBlockTransferService: Server created on 61922 16/03/01 09:21:31 INFO BlockManagerMaster: Trying to register BlockManager 16/03/01 09:21:31 INFO BlockManagerMasterEndpoint: Registering block manager localhost:61922 with 5.1 GB RAM, BlockManagerId(driver, localhost, 61922) 16/03/01 09:21:31 INFO BlockManagerMaster: Registered BlockManager Exception in thread "main" java.lang.UnsupportedOperationException: no encoder found for com.lordjoe.testing.MyUnBean at org.apache.spark.sql.catalyst.JavaTypeInference$.org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor(JavaTypeInference.scala:403) at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor$1.apply(JavaTypeInference.scala:400) at org.apache.spark.sql.catalyst.JavaTypeInference$$anonfun$org$apache$spark$sql$catalyst$JavaTypeInference$$extractorFor$1.apply(JavaTypeInference.scala:393) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252) at
DataSet Evidence
I have a relatively complex Java object that I would like to use in a dataset if I say Encoder evidence = Encoders.kryo(MyType.class); JavaRDD rddMyType= generateRDD(); // some code Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), evidence); I get one column - the whole object The object is a bean with all fields having getters and setters but some of the fields are other complex java objects - It would be fine to serielize the objects in these fields with Kryo or Java serialization but the Bean serializer treats all referenced objects as beans and some lack the required getter and setter fields How can I get my columns with bean serializer even if some of the values in the columns are not bean types
Datasets and columns
assume I have the following code SparkConf sparkConf = new SparkConf(); JavaSparkContext sqlCtx= new JavaSparkContext(sparkConf); JavaRDD rddMyType= generateRDD(); // some code Encoder evidence = Encoders.kryo(MyType.class); Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), evidence); Now I have a Dataset of MyType and assume there is some data. Assume MyType has bean fields with getters and setters as well as some internal collections and other data. What can I say about datasetMyType?? Does datasetMyType have columns and if so what? If not are there other ways to maka a DataSet with columns and if so what are they
Re: Datasets and columns
Ok when I look at the schema it looks like KRYO makes one column is there a way to do a custom encoder with my own columns On Jan 25, 2016 1:30 PM, "Michael Armbrust" <mich...@databricks.com> wrote: > The encoder is responsible for mapping your class onto some set of > columns. Try running: datasetMyType.printSchema() > > On Mon, Jan 25, 2016 at 1:16 PM, Steve Lewis <lordjoe2...@gmail.com> > wrote: > >> assume I have the following code >> >> SparkConf sparkConf = new SparkConf(); >> >> JavaSparkContext sqlCtx= new JavaSparkContext(sparkConf); >> >> JavaRDD rddMyType= generateRDD(); // some code >> >> Encoder evidence = Encoders.kryo(MyType.class); >> Dataset datasetMyType= sqlCtx.createDataset( rddMyType.rdd(), >> evidence); >> >> Now I have a Dataset of MyType and assume there is some data. >> >> Assume MyType has bean fields with getters and setters as well as some >> internal collections and other data. What can I say about datasetMyType?? >> >> Does datasetMyType have columns and if so what? >> >> If not are there other ways to maka a DataSet with columns and if so what >> are they >> >> >> >
Re: I need help mapping a PairRDD solution to Dataset
Thanks - this helps a lot except for the issue of looking at schools in neighboring regions On Wed, Jan 20, 2016 at 10:43 AM, Michael Armbrust <mich...@databricks.com> wrote: > The analog to PairRDD is a GroupedDataset (created by calling groupBy), > which offers similar functionality, but doesn't require you to construct > new object that are in the form of key/value pairs. It doesn't matter if > they are complex objects, as long as you can create an encoder for them > (currently supported for JavaBeans and case classes, but support for custom > encoders is on the roadmap). These encoders are responsible for both fast > serialization and providing a view of your object that looks like a row. > > Based on the description of your problem, it sounds like you can use > joinWith and just express the predicate as a column. > > import org.apache.spark.sql.functions._ > ds1.as("child").joinWith(ds2.as("school"), expr("child.region = > school.region")) > > The as operation is only required if you need to differentiate columns on > either side that have the same name. > > Note that by defining the join condition as an expression instead of a > lambda function, we are giving Spark SQL more information about the join so > it can often do the comparison without needing to deserialize the object, > which overtime will let us put more optimizations into the engine. > > You can also do this using lambda functions if you want though: > > ds1.groupBy(_.region).cogroup(ds2.groupBy(_.region) { (key, iter1, iter2) > => > ... > } > > > On Wed, Jan 20, 2016 at 10:26 AM, Steve Lewis <lordjoe2...@gmail.com> > wrote: > >> We have been working a large search problem which we have been solving in >> the following ways. >> >> We have two sets of objects, say children and schools. The object is to >> find the closest school to each child. There is a distance measure but it >> is relatively expensive and would be very costly to apply to all pairs. >> >> However the map can be divided into regions. If we assume that the >> closest school to a child is in his region of a neighboring region we need >> only compute the distance between a child and all schools in his region and >> neighboring regions. >> >> We currently use paired RDDs and a join to do this assigning children to >> one region and schools to their own region and neighboring regions and then >> creating a join and computing distances. Note the real problem is more >> complex. >> >> I can create Datasets of the two types of objects but see no Dataset >> analog for a PairRDD. How could I map my solution using PairRDDs to >> Datasets - assume the two objects are relatively complex data types and do >> not look like SQL dataset rows? >> >> >> > -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
I need help mapping a PairRDD solution to Dataset
We have been working a large search problem which we have been solving in the following ways. We have two sets of objects, say children and schools. The object is to find the closest school to each child. There is a distance measure but it is relatively expensive and would be very costly to apply to all pairs. However the map can be divided into regions. If we assume that the closest school to a child is in his region of a neighboring region we need only compute the distance between a child and all schools in his region and neighboring regions. We currently use paired RDDs and a join to do this assigning children to one region and schools to their own region and neighboring regions and then creating a join and computing distances. Note the real problem is more complex. I can create Datasets of the two types of objects but see no Dataset analog for a PairRDD. How could I map my solution using PairRDDs to Datasets - assume the two objects are relatively complex data types and do not look like SQL dataset rows?
Strange Set of errors
I am running on a spark 1.5.1 cluster managed by Mesos - I have an application that handled a chemistry problem which can be increased by increasing the number of atoms - increasing the number of Spark stages. I do a repartition at each stage - Stage 9 is the last stage. At each stage the size and complexity increases by a factor of 8 or so. Problems with 8 stages run with no difficulty - ones with 9 stages never work - the always crash in a manner similar to the stack dump below ( sorry for the length but NONE of steps are mine. I do not see any slaves throwing an exception (which has different errors anyway) I am completely baffled and believe the error is in something Spark is doing - I use 7000 or so tasks to try to divide the work - I see the same issue when I cut the parallelism to 256 but tasks run longer - my mean task takes about 5 minutes (oh yes I expect the job to take about 8 hours on my 15 node cluster. Any bright ideas [Stage 9:==> (5827 + 60) / 7776]Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because Stage 9 was cancelled at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1229) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply$mcVI$sp(DAGScheduler.scala:1217) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1216) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleStageCancellation$1.apply(DAGScheduler.scala:1216) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofInt.foreach(ArrayOps.scala:156) at org.apache.spark.scheduler.DAGScheduler.handleStageCancellation(DAGScheduler.scala:1216) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1469) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1822) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1835) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1848) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1919) at org.apache.spark.rdd.RDD.count(RDD.scala:1121) at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:445) at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:47) at com.lordjoe.molgen.SparkAtomGenerator.run(SparkAtomGenerator.java:150) at com.lordjoe.molgen.SparkAtomGenerator.run(SparkAtomGenerator.java:110) at com.lordjoe.molgen.VariantCounter.main(VariantCounter.java:80) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/12/14 09:53:20 WARN ServletHandler: /stages/stage/kill/ java.lang.InterruptedException: sleep interrupted at java.lang.Thread.sleep(Native Method) at org.apache.spark.ui.jobs.StagesTab.handleKillRequest(StagesTab.scala:49) at org.apache.spark.ui.SparkUI$$anonfun$3.apply(SparkUI.scala:71) at org.apache.spark.ui.SparkUI$$anonfun$3.apply(SparkUI.scala:71) at org.apache.spark.ui.JettyUtils$$anon$2.doRequest(JettyUtils.scala:141) at org.apache.spark.ui.JettyUtils$$anon$2.doGet(JettyUtils.scala:128) at javax.servlet.http.HttpServlet.service(HttpServlet.java:735) at javax.servlet.http.HttpServlet.service(HttpServlet.java:848) at org.spark-project.jetty.servlet.ServletHolder.handle(ServletHolder.java:684) at org.spark-project.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:501) at org.spark-project.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1086) at
Has the format of a spark jar file changes in 1.5
I have been using my own code to build the jar file I use for spark submit. In 1.4 I could simply add all class and resource files I find in the class path to the jar and add all jars in the classpath into a directory called lib in the jar file. In 1.5 I see that resources and classes in jars in the lib directory are not being found and I am forced to add them at the top level. Has something changed recently in the structure of Spark jar files or how the class loader works. I find little documentation on the structure of a Spark jar used in spark-submit
Looking for a few Spark Benchmarks
I was in a discussion with someone who works for a cloud provider which offers Spark/Hadoop services. We got into a discussion of performance and the bewildering array of machine types and the problem of selecting a cluster with 20 Large instances VS 10 Jumbo instances or the trade offs between the cost of running a problem for longer on a small cluster vs shorter on a large cluster. He offered to run some standard spark jobs on a number of clusters of different size and machine type and post the results. I thought if we could find a half dozen benchmarks (including data) which differ in CPU, IO and memory requirements, were open source and well known the post might help users since they could look at the posted data and select an optimal configuration for a the benchmark closer to their case. Probably problems sized to take 15 minutes on a medium 16 node cluster would be good because setup and deployment tend to obscure runtime issues. Terasort comes to mind as one problem - I suspect the ADAM group might have a biological problem like K-Mers but I am looking fora few others
Re: How can I force operations to complete and spool to disk
I give the executor 14gb and would like to cut it. I expect the critical operations to run hundreds of millions of times which is why we run on a cluster. I will try DISK_ONLY_SER Thanks Steven Lewis sent from my phone On May 7, 2015 10:59 AM, ayan guha guha.a...@gmail.com wrote: 2*2 cents 1. You can try repartition and give a large number to achieve smaller partitions. 2. OOM errors can be avoided by increasing executor memory or using off heap storage 3. How are you persisting? You can try using persist using DISK_ONLY_SER storage level 4. You may take a look in the algorithm once more. Tasks typically preform both operations several hundred thousand times. why it can not be done distributed way? On Thu, May 7, 2015 at 3:16 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am performing a job where I perform a number of steps in succession. One step is a map on a JavaRDD which generates objects taking up significant memory. The this is followed by a join and an aggregateByKey. The problem is that the system is running getting OutOfMemoryErrors - Most tasks work but a few fail. Tasks typically preform both operations several hundred thousand times. I am convinced things would work if the map ran to completion and shuffled results to disk before starting the aggregateByKey. I tried calling persist and then count on the results of the map to force execution but this does not seem to help. Smaller partitions might also help if these could be forced. Any ideas? -- Best Regards, Ayan Guha
How can I force operations to complete and spool to disk
I am performing a job where I perform a number of steps in succession. One step is a map on a JavaRDD which generates objects taking up significant memory. The this is followed by a join and an aggregateByKey. The problem is that the system is running getting OutOfMemoryErrors - Most tasks work but a few fail. Tasks typically preform both operations several hundred thousand times. I am convinced things would work if the map ran to completion and shuffled results to disk before starting the aggregateByKey. I tried calling persist and then count on the results of the map to force execution but this does not seem to help. Smaller partitions might also help if these could be forced. Any ideas?
Re: Can a map function return null
So you imagine something like this: JavaRDDString words = ... JavaRDD OptionalString wordsFiltered = words.map(new FunctionString, OptionalString() { @Override public OptionalString call(String s) throws Exception { if ((s.length()) % 2 == 1) // drop strings of odd length return Optional.empty(); else return Optional.of(s); } }); That seems to return the wrong type a JavaRDD OptionalString which cannot be used as a JavaRDDString which is what the next step expects On Sun, Apr 19, 2015 at 12:17 PM, Evo Eftimov evo.efti...@isecc.com wrote: I am on the move at the moment so i cant try it immediately but from previous memory / experience i think if you return plain null you will get a spark exception Anyway yiu can try it and see what happens and then ask the question If you do get exception try Optional instead of plain null Sent from Samsung Mobile Original message From: Olivier Girardot Date:2015/04/18 22:04 (GMT+00:00) To: Steve Lewis ,user@spark.apache.org Subject: Re: Can a map function return null You can return an RDD with null values inside, and afterwards filter on item != null In scala (or even in Java 8) you'd rather use Option/Optional, and in Scala they're directly usable from Spark. Exemple : sc.parallelize(1 to 1000).flatMap(item = if (item % 2 ==0) Some(item) else None).collect() res0: Array[Int] = Array(2, 4, 6, ) Regards, Olivier. Le sam. 18 avr. 2015 à 20:44, Steve Lewis lordjoe2...@gmail.com a écrit : I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDDFoo original = ... JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() { @Override public IterableFoo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDDFoo words = original.map(new MapFunctionString, String() { @Override Foo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } }); -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Can a map function return null
I find a number of cases where I have an JavaRDD and I wish to transform the data and depending on a test return 0 or one item (don't suggest a filter - the real case is more complex). So I currently do something like the following - perform a flatmap returning a list with 0 or 1 entry depending on the isUsed function. JavaRDDFoo original = ... JavaRDDFoo words = original.flatMap(new FlatMapFunctionFoo, Foo() { @Override public IterableFoo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) ret.add(transform(s)); return ret; // contains 0 items if isUsed is false } }); My question is can I do a map returning the transformed data and null if nothing is to be returned. as shown below - what does a Spark do with a map function returning null JavaRDDFoo words = original.map(new MapFunctionString, String() { @Override Foo call(final Foo s) throws Exception { ListFoo ret = new ArrayListFoo(); if(isUsed(s)) return transform(s); return null; // not used - what happens now } });
Fwd: Numbering RDD members Sequentially
-- Forwarded message -- From: Steve Lewis lordjoe2...@gmail.com Date: Wed, Mar 11, 2015 at 9:13 AM Subject: Re: Numbering RDD members Sequentially To: Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com perfect - exactly what I was looking for, not quite sure why it is called zipWithIndex since zipping is not involved my code does something like this where IMeasuredSpectrum is a large class we want to set an index for public static JavaRDDIMeasuredSpectrum indexSpectra(JavaRDDIMeasuredSpectrum pSpectraToScore) { JavaPairRDDIMeasuredSpectrum,Long indexed = pSpectraToScore.zipWithIndex(); pSpectraToScore = indexed.map(new AddIndexToSpectrum()) ; return pSpectraToScore; } public class AddIndexToSpectrum implements FunctionTuple2IMeasuredSpectrum, java.lang.Long, IMeasuredSpectrum { @Override public IMeasuredSpectrum doCall(final Tuple2IMeasuredSpectrum, java.lang.Long v1) throws Exception { IMeasuredSpectrum spec = v1._1(); long index = v1._2(); spec.setIndex( index + 1 ); return spec; } } } On Wed, Mar 11, 2015 at 6:57 AM, Daniel, Ronald (ELS-SDG) r.dan...@elsevier.com wrote: Have you looked at zipWithIndex? *From:* Steve Lewis [mailto:lordjoe2...@gmail.com] *Sent:* Tuesday, March 10, 2015 5:31 PM *To:* user@spark.apache.org *Subject:* Numbering RDD members Sequentially I have Hadoop Input Format which reads records and produces JavaPairRDDString,String locatedData where _1() is a formatted version of the file location - like 12690,, 24386 .27523 ... _2() is data to be processed For historical reasons I want to convert _1() into in integer representing the record number. so keys become 0001, 002 ... (Yes I know this cannot be done in parallel) The PairRDD may be too large to collect and work on one machine but small enough to handle on a single machine. I could use toLocalIterator to guarantee execution on one machine but last time I tried this all kinds of jobs were launched to get the next element of the iterator and I was not convinced this approach was efficient.
Numbering RDD members Sequentially
I have Hadoop Input Format which reads records and produces JavaPairRDDString,String locatedData where _1() is a formatted version of the file location - like 12690,, 24386 .27523 ... _2() is data to be processed For historical reasons I want to convert _1() into in integer representing the record number. so keys become 0001, 002 ... (Yes I know this cannot be done in parallel) The PairRDD may be too large to collect and work on one machine but small enough to handle on a single machine. I could use toLocalIterator to guarantee execution on one machine but last time I tried this all kinds of jobs were launched to get the next element of the iterator and I was not convinced this approach was efficient. Any bright ideas?
Is there a way to read a parquet database without generating an RDD
I have an application where a function needs access to the results of a select from a parquet database. Creating a JavaSQLContext and from it a JavaSchemaRDD as shown below works but the parallelism is not needed - a simple JDBC call would work - Are there alternative non-parallel ways to achieve the same result JavaSQLContext sqlContext = application code JavaSchemaRDD parquetFile = sqlContext.parquetFile(MyDatabase); parquetFile.registerAsTable(peptides); JavaSchemaRDD binCounts = sqlContext.sql(SELECT * FROM + peptides + Where massBin = + mzAsInt); IteratorRow rowIterator = binCounts.toLocalIterator(); while (rowIterator.hasNext()) { Row rw = rowIterator.next(); ... application code }
Is there a way (in Java) to turn Java Iterable into a JavaRDD?
I notice new methods such as JavaSparkContext makeRDD (with few useful examples) - It takes a Seq but while there are ways to turn a list into a Seq I see nothing that uses an Iterable
Who is using Spark and related technologies for bioinformatics applications?
I am aware of the ADAM project in Berkeley and I am working on Proteomic searches - anyone else working in this space
how to convert an rdd to a single output file
I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) {writer.appendHeader(out, getApplication()); IteratorIScoredScan scanIterator = scans.toLocalIterator(); while(scanIterator.hasNext()) {IScoredScan scan = scanIterator.next();writer.appendScan(out, getApplication(), scan);}writer.appendFooter(out, getApplication());}*
Re: how to convert an rdd to a single output file
The objective is to let the Spark application generate a file in a format which can be consumed by other programs - as I said I am willing to give up parallelism at this stage (all the expensive steps were earlier but do want an efficient way to pass once through an RDD without the requirement to hold it in memory as a list. On Fri, Dec 12, 2014 at 12:22 PM, Sameer Farooqui same...@databricks.com wrote: Instead of doing this on the compute side, I would just write out the file with different blocks initially into HDFS and then use hadoop fs -getmerge or HDFSConcat to get one final output file. - SF On Fri, Dec 12, 2014 at 11:19 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) { writer.appendHeader(out, getApplication());IteratorIScoredScan scanIterator = scans.toLocalIterator();while(scanIterator.hasNext()) { IScoredScan scan = scanIterator.next();writer.appendScan(out, getApplication(), scan);}writer.appendFooter(out, getApplication());}* -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: how to convert an rdd to a single output file
what would good spill settings be? On Fri, Dec 12, 2014 at 2:45 PM, Sameer Farooqui same...@databricks.com wrote: You could try re-partitioning or coalescing the RDD to partition and then write it to disk. Make sure you have good spill settings enabled so that the RDD can spill to the local temp dirs if it has to. On Fri, Dec 12, 2014 at 2:39 PM, Steve Lewis lordjoe2...@gmail.com wrote: The objective is to let the Spark application generate a file in a format which can be consumed by other programs - as I said I am willing to give up parallelism at this stage (all the expensive steps were earlier but do want an efficient way to pass once through an RDD without the requirement to hold it in memory as a list. On Fri, Dec 12, 2014 at 12:22 PM, Sameer Farooqui same...@databricks.com wrote: Instead of doing this on the compute side, I would just write out the file with different blocks initially into HDFS and then use hadoop fs -getmerge or HDFSConcat to get one final output file. - SF On Fri, Dec 12, 2014 at 11:19 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an RDD which is potentially too large to store in memory with collect. I want a single task to write the contents as a file to hdfs. Time is not a large issue but memory is. I say the following converting my RDD (scans) to a local Iterator. This works but hasNext shows up as a separate task and takes on the order of 20 sec for a medium sized job - is *toLocalIterator a bad function to call in this case and is there a better one?* *public void writeScores(final Appendable out, JavaRDDIScoredScan scans) {writer.appendHeader(out, getApplication());IteratorIScoredScan scanIterator = scans.toLocalIterator();while(scanIterator.hasNext()) {IScoredScan scan = scanIterator.next(); writer.appendScan(out, getApplication(), scan);} writer.appendFooter(out, getApplication());}* -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: How can I create an RDD with millions of entries created programmatically
looks good but how do I say that in Java as far as I can see sc.parallelize (in Java) has only one implementation which takes a List - requiring an in memory representation On Mon, Dec 8, 2014 at 12:06 PM, Daniel Darabos daniel.dara...@lynxanalytics.com wrote: Hi, I think you have the right idea. I would not even worry about flatMap. val rdd = sc.parallelize(1 to 100, numSlices = 1000).map(x = generateRandomObject(x)) Then when you try to evaluate something on this RDD, it will happen partition-by-partition. So 1000 random objects will be generated at a time per executor thread. On Mon, Dec 8, 2014 at 8:05 PM, Steve Lewis lordjoe2...@gmail.com wrote: I have a function which generates a Java object and I want to explore failures which only happen when processing large numbers of these object. the real code is reading a many gigabyte file but in the test code I can generate similar objects programmatically. I could create a small list, parallelize it and then use flatmap to inflate it several times by a factor of 1000 (remember I can hold a list of 1000 items in memory but not a million) Are there better ideas - remember I want to create more objects than can be held in memory at once. -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
In Java how can I create an RDD with a large number of elements
assume I don't care about values which may be created in a later map - in scala I can say val rdd = sc.parallelize(1 to 10, numSlices = 1000) but in Java JavaSparkContext can only paralellize a List - limited to Integer,MAX_VALUE elements and required to exist in memory - the best I can do on memory is to build my own List based on a BitSet. Is there a JIRA asking for JavaSparkContext.parallelize to take an Iterable or an Iterator? I am trying to make an RDD with at least 100 million elements and if possible several billion to test performance issues on a large application
I am having problems reading files in the 4GB range
I am using a custom hadoop input format which works well on smaller files but fails with a file at about 4GB size - the format is generating about 800 splits and all variables in my code are longs - Any suggestions? Is anyone reading files of this size? Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 113 in stage 0.0 failed 4 times, mostrecent failure: Lost task 113.3 in stage 0.0 (TID 38, pltrd022.labs.uninett.no): java.lang.IllegalArgumentException: Size exceeds Integ er.MAX_VALUE sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:836) org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:104) org.apache.spark.storage.BlockManager.doGetLocal(BlockManager.scala:452) org.apache.spark.storage.BlockManager.getLocal(BlockManager.scala:368) org.apache.spark.storage.BlockManager.get(BlockManager.scala:552) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:44) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745)
Problems creating and reading a large test file
I am trying to look at problems reading a data file over 4G. In my testing I am trying to create such a file. My plan is to create a fasta file (a simple format used in biology) looking like 1 TCCTTACGGAGTTCGGGTGTTTATCTTACTTATCGCGGTTCGCTGCCGCTCCGGGAGCCCGGATAGGCTGCGTTAATACCTAAGGAGCGCGTATTG 2 GTCTGATCTAAATGCGACGACGTCTTTAGTGCTAAGTGGAACCCAATCTTAAGACCCAGGCTCTTAAGCAGAAACAGACCGTCCCTGCCTCCTGGAGTAT 3 ... I create a list with 5000 structures - use flatMap to add 5000 per entry and then either call saveAsText or dnaFragmentIterator = mySet.toLocalIterator(); and write to HDFS Then I try to call JavaRDDString lines = ctx.textFile(hdfsFileName); what I get on a 16 node cluster 14/12/06 01:49:21 ERROR SendingConnection: Exception while reading SendingConnection to ConnectionManagerId(pltrd007.labs.uninett.no,50119) java.nio.channels.ClosedChannelException 2 14/12/06 01:49:35 ERROR BlockManagerMasterActor: Got two different block manager registrations on 20140711-081617-711206558-5050-2543-13 The code is at the line below - I did not want to spam the group although it is only a couple of pages - I am baffled - there are no issues when I create a few thousand records but things blow up when I try 25 million records or a file of 6B or so Can someone take a look - it is not a lot of code https://drive.google.com/file/d/0B4cgoSGuA4KWUmo3UzBZRmU5M3M/view?usp=sharing
Failed to read chunk exception
I am running a large job using 4000 partitions - after running for four hours on a 16 node cluster it fails with the following message. The errors are in spark code and seem address unreliability at the level of the disk - Anyone seen this and know what is going on and how to fix it. Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 15.827 failed 4 times, most recent failure: Lost task 13.3 in stage 15.827 (TID 13386, pltrd022.labs.uninett.no): java.io.IOException: failed to read chunk org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:348) org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:159) org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:142) .
Re: Any ideas why a few tasks would stall
Thanks - I found the same thing - calling boolean forceShuffle = true; myRDD = myRDD.coalesce(120,forceShuffle ); worked - there were 120 partitions but forcing a shuffle distributes the work I believe there is a bug in my code causing memory to accumulate as partitions grow in size. With a job ofer ten times larger I ran into other issues raising the number of partitions to 10,000 - namely too many open files On Thu, Dec 4, 2014 at 8:32 AM, Sameer Farooqui same...@databricks.com wrote: Good point, Ankit. Steve - You can click on the link for '27' in the first column to get a break down of how much data is in each of those 116 cached partitions. But really, you want to also understand how much data is in the 4 non-cached partitions, as they may be huge. One thing you can try doing is .repartition() on the RDD with something like 100 partitions and then cache this new RDD. See if that spreads the load between the partitions more evenly. Let us know how it goes. On Thu, Dec 4, 2014 at 12:16 AM, Ankit Soni ankitso...@gmail.com wrote: I ran into something similar before. 19/20 partitions would complete very quickly, and 1 would take the bulk of time and shuffle reads writes. This was because the majority of partitions were empty, and 1 had all the data. Perhaps something similar is going on here - I would suggest taking a look at how much data each partition contains and try to achieve a roughly even distribution for best performance. In particular, if the RDDs are PairRDDs, partitions are assigned based on the hash of the key, so an even distribution of values among keys is required for even split of data across partitions. On December 2, 2014 at 4:15:25 PM, Steve Lewis (lordjoe2...@gmail.com) wrote: 1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD Name Storage Level Cached Partitions Fraction Cached Size in Memory Size in Tachyon Size on Disk 2 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=2 Memory Deserialized 1x Replicated 1 100% 11.8 MB 0.0 B 0.0 B 14 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=14 Memory Deserialized 1x Replicated 1 100% 122.7 MB 0.0 B 0.0 B 7 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=7 Memory Deserialized 1x Replicated 120 100% 151.1 MB 0.0 B 0.0 B 1 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=1 Memory Deserialized 1x Replicated 1 100% 65.6 MB 0.0 B 0.0 B 10 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=10 Memory Deserialized 1x Replicated 24 100% 160.6 MB 0.0 B 0.0 B 27 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=27 Memory Deserialized 1x Replicated 116 97% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How can a function get a TaskContext
https://github.com/apache/spark/blob/master/core/src/main/java/org/apache/spark/TaskContext.java has a Java implementation if TaskContext wit a very useful method /** * Return the currently active TaskContext. This can be called inside of * user functions to access contextual information about running tasks. */ public static TaskContext get() { return taskContext.get(); } I would like to call this but my Spark 1.1 code seems to use a Scala Taskcontext lacking a get method How can one get a Task context and is which versions is get supported
How can a function running on a slave access the Executor
I have been working on balancing work across a number of partitions and find it would be useful to access information about the current execution environment much of which (like Executor ID) are available if there was a way to get the current executor or the Hadoop TaskAttempt context - does any one on the list know how to access this object from a function running on a slave. Currently I am reduced to tracking Mac Address to at least know which machine code is running on but there must be a better way
Re: Any ideas why a few tasks would stall
1) I can go there but none of the links are clickable 2) when I see something like 116/120 partitions succeeded in the stages ui in the storage ui I see NOTE RDD 27 has 116 partitions cached - 4 not and those are exactly the number of machines which will not complete Also RDD 27 does not show up in the Stages UI RDD NameStorage LevelCached PartitionsFraction CachedSize in MemorySize in TachyonSize on Disk2 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=2Memory Deserialized 1x Replicated1100%11.8 MB0.0 B0.0 B14 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=14Memory Deserialized 1x Replicated1100%122.7 MB0.0 B0.0 B7 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=7Memory Deserialized 1x Replicated120100%151.1 MB0.0 B0.0 B1 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=1Memory Deserialized 1x Replicated1100%65.6 MB0.0 B0.0 B10 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=10Memory Deserialized 1x Replicated24100%160.6 MB0.0 B0.0 B27 http://hwlogin.labs.uninett.no:4040/storage/rdd?id=27Memory Deserialized 1x Replicated11697% On Tue, Dec 2, 2014 at 3:43 PM, Sameer Farooqui same...@databricks.com wrote: Have you tried taking thread dumps via the UI? There is a link to do so on the Executors' page (typically under http://driver IP:4040/exectuors. By visualizing the thread call stack of the executors with slow running tasks, you can see exactly what code is executing at an instant in time. If you sample the executor several times in a short time period, you can identify 'hot spots' or expensive sections in the user code. On Tue, Dec 2, 2014 at 3:03 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am working on a problem which will eventually involve many millions of function calls. A have a small sample with several thousand calls working but when I try to scale up the amount of data things stall. I use 120 partitions and 116 finish in very little time. The remaining 4 seem to do all the work and stall after a fixed number (about 1000) calls and even after hours make no more progress. This is my first large and complex job with spark and I would like any insight on how to debug the issue or even better why it might exist. The cluster has 15 machines and I am setting executor memory at 16G. Also what other questions are relevant to solving the issue -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How can a function access Executor ID, Function ID and other parameters
I am running on a 15 node cluster and am trying to set partitioning to balance the work across all nodes. I am using an Accumulator to track work by Mac Address but would prefer to use data known to the Spark environment - Executor ID, and Function ID show up in the Spark UI and Task ID and Attempt ID (assuming these work like Hadoop) would be useful. Does someone know how code running in a function can access these parameters. I think I have asked this group several times about Task IDand Attempt ID without getting a reply. Incidentally the data I collect suggests that my execution is not at all balanced
How can a function access Executor ID, Function ID and other parameters known to the Spark Environment
I am running on a 15 node cluster and am trying to set partitioning to balance the work across all nodes. I am using an Accumulator to track work by Mac Address but would prefer to use data known to the Spark environment - Executor ID, and Function ID show up in the Spark UI and Task ID and Attempt ID (assuming these work like Hadoop) would be useful. Does someone know how code running in a function can access these parameters. I think I have asked this group several times about Task ID and Attempt ID without getting a reply. Incidentally the data I collect suggests that my execution is not at all balanced
Why is this operation so expensive
I have an JavaPairRDDKeyType,Tuple2Type1,Type2 originalPairs. There are on the order of 100 million elements I call a function to rearrange the tuples JavaPairRDDString,Tuple2Type1,Type2 newPairs = originalPairs.values().mapToPair(new PairFunctionTuple2Type1,Type2, String, Tuple2IType1,Type2 { @Override public Tuple2String, Tuple2Type1,Type2 doCall(final Tuple2Type1,Type2 t) { return new Tuple2String, Tuple2Type1,Type2(t._1().getId(), t); } } where Type1.getId() returns a String The data are spread across 120 partitions on 15 machines. The operation is dead simple and yet it takes 5 minutes to generate the data and over 30 minutes to perform this simple operation. I am at a loss to understand what is taking so long or how to make it faster. It this stage there is no reason to move data to different partitions Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex objects weighing in at about 10kb
Re: Why is this operation so expensive
If I combineByKey in the next step I suppose I am paying for a shuffle I need any way - right? Also if I supply a custom partitioner rather than hash can I control where and how data is shuffled - overriding equals and hashcode could be a bad thing but a custom partitioner is less dangerous On Tue, Nov 25, 2014 at 1:55 PM, Andrew Ash and...@andrewash.com wrote: Hi Steve, You changed the first value in a Tuple2, which is the one that Spark uses to hash and determine where in the cluster to place the value. By changing the first part of the PairRDD, you've implicitly asked Spark to reshuffle the data according to the new keys. I'd guess that you would observe large amounts of shuffle in the webui as a result of this code. If you don't actually need your data shuffled by the first part of the pair RDD, then consider making the KeyType not in the first half of the PairRDD. An alternative is to make the .equals() and .hashcode() of KeyType delegate to the .getId() method you use in the anonymous function. Cheers, Andrew On Tue, Nov 25, 2014 at 10:06 AM, Steve Lewis lordjoe2...@gmail.com wrote: I have an JavaPairRDDKeyType,Tuple2Type1,Type2 originalPairs. There are on the order of 100 million elements I call a function to rearrange the tuples JavaPairRDDString,Tuple2Type1,Type2 newPairs = originalPairs.values().mapToPair(new PairFunctionTuple2Type1,Type2, String, Tuple2IType1,Type2 { @Override public Tuple2String, Tuple2Type1,Type2 doCall(final Tuple2Type1,Type2 t) { return new Tuple2String, Tuple2Type1,Type2(t._1().getId(), t); } } where Type1.getId() returns a String The data are spread across 120 partitions on 15 machines. The operation is dead simple and yet it takes 5 minutes to generate the data and over 30 minutes to perform this simple operation. I am at a loss to understand what is taking so long or how to make it faster. It this stage there is no reason to move data to different partitions Anyone have bright ideas - Oh yes Type1 and Type2 are moderately complex objects weighing in at about 10kb -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How do I get the executor ID from running Java code
The spark UI lists a number of Executor IDS on the cluster. I would like to access both executor ID and Task/Attempt IDs from the code inside a function running on a slave machine. Currently my motivation is to examine parallelism and locality but in Hadoop this aids in allowing code to write non-overlapping temporary files
How do you force a Spark Application to run in multiple tasks
I have instrumented word count to track how many machines the code runs on. I use an accumulator to maintain a Set or MacAddresses. I find that everything is done on a single machine. This is probably optimal for word count but not the larger problems I am working on. How to a force processing to be split into multiple tasks. How to I access the task and attempt numbers to track which processing happens in which attempt. Also is using MacAddress to determine which machine is running the code. As far as I can tell a simple word count is running in one thread on one machine and the remainder of the cluster does nothing, This is consistent with tests where I write to sdout from functions and see little output on most machines in the cluster
Re: How do you force a Spark Application to run in multiple tasks
The cluster runs Mesos and I can see the tasks in the Mesos UI but most are not doing much - any hints about that UI On Fri, Nov 14, 2014 at 11:39 AM, Daniel Siegmann daniel.siegm...@velos.io wrote: Most of the information you're asking for can be found on the Spark web UI (see here http://spark.apache.org/docs/1.1.0/monitoring.html). You can see which tasks are being processed by which nodes. If you're using HDFS and your file size is smaller than the HDFS block size you will only have one partition (remember, there is exactly one task for each partition in a stage). If you want to force it to have more partitions, you can call RDD.repartition(numPartitions). Note that this will introduce a shuffle you wouldn't otherwise have. Also make sure your job is allocated more than one core in your cluster (you can see this on the web UI). On Fri, Nov 14, 2014 at 2:18 PM, Steve Lewis lordjoe2...@gmail.com wrote: I have instrumented word count to track how many machines the code runs on. I use an accumulator to maintain a Set or MacAddresses. I find that everything is done on a single machine. This is probably optimal for word count but not the larger problems I am working on. How to a force processing to be split into multiple tasks. How to I access the task and attempt numbers to track which processing happens in which attempt. Also is using MacAddress to determine which machine is running the code. As far as I can tell a simple word count is running in one thread on one machine and the remainder of the cluster does nothing, This is consistent with tests where I write to sdout from functions and see little output on most machines in the cluster -- Daniel Siegmann, Software Developer Velos Accelerating Machine Learning 54 W 40th St, New York, NY 10018 E: daniel.siegm...@velos.io W: www.velos.io -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How can my java code executing on a slave find the task id?
I am trying to determine how effective partitioning is at parallelizing my tasks. So far I suspect it that all work is done in one task. My plan is to create a number of accumulators - one for each task and have functions increment the accumulator for the appropriate task (or slave) the values could then tell me how balanced the computation is but I am not sure how to access information about the slave. Any bright ideas?
How (in Java) do I create an Accumulator of type Long
JavaSparkContext currentContext = ...; AccumulatorInteger accumulator = currentContext.accumulator(0, MyAccumulator); will create an Accumulator of Integers. For many large Data problems Integer is too small and Long is a better type. I see a call like the following AccumulatorParamLong param = ?? how do I get one of these; // no compiler is unhappy with this call AccumulatorLong accumulatorLong = currentContext.accumulator(new Long(0), acc ,param); but NONE - ZERO documentation on its use, how to get a AccumulatorParamLong or how to turn one into an Accumulator. Any ideas
Re: How (in Java) do I create an Accumulator of type Long
I see Javadoc Style documentation but nothing that looks like a code sample I tried the following before asking public static class LongAccumulableParam implements AccumulableParamLong,Long,Serializable { @Override public Long addAccumulator(final Long r, final Long t) { return r + t; } @Override public Long addInPlace(final Long r1, final Long r2) { return r1 + r2; } @Override public Long zero(final Long initialValue) { return 0L; } } sparkContext.currentContext.accumulator(0L, myAccumulator, new LongAccumulableParam ()); does not compile which is why I ask for code samples The Javados for accumulator says *accumulator https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/api/java/JavaSparkContext.html#accumulator(T, org.apache.spark.AccumulatorParam)*(T initialValue, AccumulatorParam https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/AccumulatorParam.html T accumulatorParam) Create an Accumulator https://spark.apache.org/docs/1.0.0/api/java/org/apache/spark/Accumulator.html variable of a given type, which tasks can add values to using the add method. which is a LONG way from a working code sample On Wed, Nov 12, 2014 at 8:18 PM, Sean Owen so...@cloudera.com wrote: It's the exact same API you've already found, and it's documented: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.AccumulatorParam JavaSparkContext has helper methods for int and double but not long. You can just make your own little implementation of AccumulatorParamLong right? ... which would be nice to add to JavaSparkContext. On Wed, Nov 12, 2014 at 11:05 PM, Steve Lewis lordjoe2...@gmail.com wrote: JavaSparkContext currentContext = ...; AccumulatorInteger accumulator = currentContext.accumulator(0, MyAccumulator); will create an Accumulator of Integers. For many large Data problems Integer is too small and Long is a better type. I see a call like the following AccumulatorParamLong param = ?? how do I get one of these; // no compiler is unhappy with this call AccumulatorLong accumulatorLong = currentContext.accumulator(new Long(0), acc ,param); but NONE - ZERO documentation on its use, how to get a AccumulatorParamLong or how to turn one into an Accumulator. Any ideas -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Is there a way to clone a JavaRDD without persisting it
In my problem I have a number of intermediate JavaRDDs and would like to be able to look at their sizes without destroying the RDD for sibsequent processing. persist will do this but these are big and perisist seems expensive and I am unsure of which StorageLevel is needed, Is there a way to clone a JavaRDD or does anyong have good ideas on how to do this?
How do I kill av job submitted with spark-submit
I see the job in the web interface but don't know how to kill it
Re: A Spark Design Problem
join seems to me the proper approach followed by keying the fits by KeyID and using combineByKey to choose the best - I am implementing that now and will report on performance On Fri, Oct 31, 2014 at 11:56 AM, Sonal Goyal sonalgoy...@gmail.com wrote: Does the following help? JavaPairRDDbin,key join with JavaPairRDDbin,lock If you partition both RDDs by the bin id, I think you should be able to get what you want. Best Regards, Sonal Nube Technologies http://www.nubetech.co http://in.linkedin.com/in/sonalgoyal On Fri, Oct 31, 2014 at 5:44 PM, Steve Lewis lordjoe2...@gmail.com wrote: The original problem is in biology but the following captures the CS issues, Assume ...
A Spark Design Problem
The original problem is in biology but the following captures the CS issues, Assume I have a large number of locks and a large number of keys. There is a scoring function between keys and locks and a key that fits a lock will have a high score. There may be many keys fitting one lock and a key may fit no locks well. The object is to find the best fitting lock for each key. Assume that the number of keys and locks is high enough that taking the cartesian product of the two is computationally impractical. Also assume that keys and locks have an attached location which is accurate within an error (say 1 Km). Only keys and locks within 1 Km need be compared. Now assume I can create a JavaRDDKeys and a JavaRDDLocks . I could divide the locations into 1 Km squared bins and look only within a few bins. Assume that it is practical to take a cartesian product for all elements in a bin but not to keep all elements in memory. I could map my RDDs into PairRDDs where the key is the bin assigned by location I know how to take the cartesian product of two JavaRDDs but not how to take a cartesian product of sets of elements sharing a common key (bin), Any suggestions. Assume that in the worst cases the number of elements in a bin are too large to keep in memory although if a bin were subdivided into, say 100 subbins elements would fit in memory. Any thoughts as to how to attack the problem
Questions about serialization and SparkConf
Assume in my executor I say SparkConf sparkConf = new SparkConf(); sparkConf.set(spark.kryo.registrator, com.lordjoe.distributed.hydra.HydraKryoSerializer); sparkConf.set(mysparc.data, Some user Data); sparkConf.setAppName(Some App); Now 1) Are there default values set in some system file which are populated if I call new SparkConf - if not how do I get those? _ I think i see defaults foe the master, the Serializer... 2) If I set a property in SparkConf for my SparkContext will I see that property in a Slave machine? 3) If I set a property anf then call showSparkProperties() do I see that property set and if not how can I see the property set - say in another thread as in if in some other thread on the executor say as in showSparkPropertiesInAnotherThread(); 4) How can a slave machine access properties set on the executor I an really interested in sparkConf.set(spark.kryo.registrator, com.lordjoe.distributed.hydra.HydraKryoSerializer); which needs to be used by the Slave /** * dump all spark properties to System.err */ public static void showSparkProperties() { SparkConf sparkConf = new SparkConf(); Tuple2String, String[] all = sparkConf.getAll(); for (Tuple2String, String prp : all) { System.err.println(prp._1().toString() + = + prp._2()); } } public static void showSparkPropertiesInAnotherThread() { new Thread(new Runnable() { @Override public void run() { showSparkProperties(); } }).start(); }
com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
A cluster I am running on keeps getting KryoException. Unlike the Java serializer the Kryo Exception gives no clue as to what class is giving the error The application runs properly locally but no the cluster and I have my own custom KryoRegistrator and register sereral dozen classes - essentially everything I can find which implements Serializable How to I find what the KryoSerializer issue is? I would love to see a list of all classes Kryo serialized
Re: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
I wrote a custom class loader to find all classes that were loaded that implement Serializabke. I ran it locally to load all classes and registered ALL of these - I still get these issues On Tue, Oct 28, 2014 at 8:02 PM, Ganelin, Ilya ilya.gane...@capitalone.com wrote: Have you checked for any global variables in your scope? Remember that even if variables are not passed to the function they will be included as part of the context passed to the nodes. If you can't zen out what is breaking then try to simplify what you're doing. Set up a simple test call (like a map) with the same objects you're trying to serialize and see if those work. -Original Message- *From: *Steve Lewis [lordjoe2...@gmail.com] *Sent: *Tuesday, October 28, 2014 10:46 PM Eastern Standard Time *To: *user@spark.apache.org *Subject: *com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994 A cluster I am running on keeps getting KryoException. Unlike the Java serializer the Kryo Exception gives no clue as to what class is giving the error The application runs properly locally but no the cluster and I have my own custom KryoRegistrator and register sereral dozen classes - essentially everything I can find which implements Serializable How to I find what the KryoSerializer issue is? I would love to see a list of all classes Kryo serialized -
Re: How do you write a JavaRDD into a single file
Collect will store the entire output in a List in memory. This solution is acceptable for Little Data problems although if the entire problem fits in the memory of a single machine there is less motivation to use Spark. Most problems which benefit from Spark are large enough that even the data assigned to a single partition will not fit into memory. In my special case the output now is in the 0.5 - 4 GB range but in the future might get to 4 times that size - something a single machine could write but not hold at one time. I find that for most problems a file like Part-0001 is not what the next step wants to use - the minute a step is required to further process that file - even move and rename - there is little reason not to let the spark code write what is wanted in the first place. I like the solution of using toLocalIterator and writing my own file
How do you write a JavaRDD into a single file
At the end of a set of computation I have a JavaRDDString . I want a single file where each string is printed in order. The data is small enough that it is acceptable to handle the printout on a single processor. It may be large enough that using collect to generate a list might be unacceptable. the saveAsText command creates multiple files with names like part, part0001 This was bed behavior in Hadoop for final output and is also bad for Spark. A more general issue is whether is it possible to convert a JavaRDD into an iterator or iterable over then entire data set without using collect or holding all data in memory. In many problems where it is desirable to parallelize intermediate steps but use a single process for handling the final result this could be very useful.
Re: How do you write a JavaRDD into a single file
Sorry I missed the discussion - although it did not answer the question - In my case (and I suspect the askers) the 100 slaves are doing a lot of useful work but the generated output is small enough to be handled by a single process. Many of the large data problems I have worked process a lot of data but end up with a single report file - frequently in a format specified by preexisting downstream code. I do not want a separate hadoop merge step for a lot of reasons starting with better control of the generation of the file. However toLocalIterator is exactly what I need. Somewhat off topic - I am being overwhelmed by getting a lot of emails from the list - is there s way to get a daily summary which might be a lot easier to keep up with On Mon, Oct 20, 2014 at 3:23 PM, Sean Owen so...@cloudera.com wrote: This was covered a few days ago: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-write-a-RDD-into-One-Local-Existing-File-td16720.html The multiple output files is actually essential for parallelism, and certainly not a bad idea. You don't want 100 distributed workers writing to 1 file in 1 place, not if you want it to be fast. RDD and JavaRDD already expose a method to iterate over the data, called toLocalIterator. It does not require that the RDD fit entirely in memory. On Mon, Oct 20, 2014 at 6:13 PM, Steve Lewis lordjoe2...@gmail.com wrote: At the end of a set of computation I have a JavaRDDString . I want a single file where each string is printed in order. The data is small enough that it is acceptable to handle the printout on a single processor. It may be large enough that using collect to generate a list might be unacceptable. the saveAsText command creates multiple files with names like part, part0001 This was bed behavior in Hadoop for final output and is also bad for Spark. A more general issue is whether is it possible to convert a JavaRDD into an iterator or iterable over then entire data set without using collect or holding all data in memory. In many problems where it is desirable to parallelize intermediate steps but use a single process for handling the final result this could be very useful. -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
How to I get at a SparkContext or better a JavaSparkContext from the middle of a function
I am running a couple of functions on an RDD which require access to data on the file system known to the context. If I create a class with a context a a member variable I get a serialization error, So I am running my function on some slave and I want to read in data from a Path defined by a string and easy to read from my driver program Later in the program the same issue arises in writing data except I would like to access a tast attempt string so different attempts do not write the same file. How can a piece of java code running inside a function on some slave get at the Task?
Broadcast Torrent fail - then the job dies
I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2 - I repeatedly see the following in my logs. I believe this happens in combineByKey 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast variable 0 failed 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 5.006378813 s 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) -
Re: Broadcast Torrent fail - then the job dies
That converts the error to the following 14/10/08 13:27:40 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 13:27:40 INFO broadcast.HttpBroadcast: Started reading broadcast variable 0 14/10/08 13:27:40 ERROR executor.Executor: Exception in task 1.0 in stage 0.0 (TID 1) java.io.FileNotFoundException: http://192.168.1.4:54221/broadcast_0 at sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1610) at org.apache.spark.broadcast.HttpBroadcast$.org$apache$spark$broadcast$HttpBroadcast$$read(HttpBroadcast.scala:197) at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:89) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:991) Curiously the error is very repeatable on a relatively large and complex program I am running but the same Spark steps work well when the Objects are Strings and Integers like word count. My objects are complex but Serialize well and run when I drop a combineByKey step On Wed, Oct 8, 2014 at 12:00 PM, Liquan Pei liquan...@gmail.com wrote: Hi Lewis, For debugging purpose, can you try using HttpBroadCast to see if the error remains? You can enable HttpBroadCast by setting spark.broadcast.factory to org.apache.spark.broadcast.HttpBroadcastFactory in spark conf. Thanks, Liquan On Wed, Oct 8, 2014 at 11:21 AM, Steve Lewis lordjoe2...@gmail.com wrote: I am running on Windows 8 using Spark 1.1.0 in local mode with Hadoop 2.2 - I repeatedly see the following in my logs. I believe this happens in combineByKey 14/10/08 09:36:30 INFO executor.Executor: Running task 3.0 in stage 0.0 (TID 3) 14/10/08 09:36:30 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR broadcast.TorrentBroadcast: Reading broadcast variable 0 failed 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Reading broadcast variable 0 took 5.006378813 s 14/10/08 09:36:35 INFO broadcast.TorrentBroadcast: Started reading broadcast variable 0 14/10/08 09:36:35 ERROR executor.Executor: Exception in task 0.0 in stage 0.0 (TID 0) java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) - -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
anyone else seeing something like https://issues.apache.org/jira/browse/SPARK-3637
java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) My spark application running on Windows 8 keeps crashing with this error and I find no work around
Re: Spark and Python using generator of data bigger than RAM as input to sc.parallelize()
Try a Hadoop Custom InputFormat - I can give you some samples - While I have not tried this an input split has only a length (could be ignores if the format treats as non splittable) and a String for a location. If the location is a URL into wikipedia the whole thing should work. Hadoop InputFormats seem to be the best way to get large (say multi gigabyte files) into RDDs
What can be done if a FlatMapFunctions generated more data that can be held in memory
I number of the problems I want to work with generate datasets which are too large to hold in memory. This becomes an issue when building a FlatMapFunction and also when the data used in combineByKey cannot be held in memory. The following is a simple, if a little silly, example of a FlatMapFunction returning maxMultiples multiples of a long. It works well for maxMultiples = 1000 but what happens if maxMultiples = 10 Billion. The issue is that call cannot return a List or any other structure which is held in memory. What can it return or is there another way to do this?? public static class GenerateMultiplesimplements FlatMapFunctionString, String { private final long maxMultiples; public GenerateMultiplesimplements (final long maxMultiples ) { this,maxMultiples = maxMultiples ; } public IterableLong call(Long l) { ListLong holder = new ArrayListLong(); for (long factor = 1; factor maxMultiples; factor++) { holder.add(new Long(l * factor); } return holder; } }
A sample for generating big data - and some design questions
This sample below is essentially word count modified to be big data by turning lines into groups of upper case letters and then generating all case variants - it is modeled after some real problems in biology The issue is I know how to do this in Hadoop but in Spark the use of a List in my flatmap function will not work as the size grows but I don't know what will or how not to keep data in memory Anyone want to look at the sample and tell me how on my machine given 8g it does Variant Size 18 Size 14188672 took 406 sec and stalls with larger cases == import org.apache.spark.*; import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*; import org.apache.spark.api.java.function.Function2; import org.apache.spark.storage.*; import scala.*; import java.util.*; /** * com.lordjoe.distributed.test.JavaBigDataWordCount * This sample is written to force a sample with large amounts of data emulating some big data aspects of * a problem in biology I am working on - * p/ * This is essentially WordCount * except that lines are filtered to just upper case words * then broken into String groups and all varients with different case are generated * so THE - THE,ThE,THe,The,tHE,thE,tHe,the * when the groups get long - say 10 or 20 a LOT of variants are generated * p/ * This sample is motivated by real problems in biology where we want to look at possible mutations in DNA fragments or * possible chemical modifications on amino acids in polypeptides - my largest Hadoop job does exactly that * p/ * I have serious questions about * A - How to write the FlatMapFunction CaseVariationFunction as the output gets large - I think storing results in a List will not work * - what are other options * B are there other ways to do this */ public final class JavaBigDataWordCount { /** * drop all characters that are not letters * * @param s input string * @return output string */ public static String dropNonLetters(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i s.length(); i++) { char c = s.charAt(i); if (Character.isLetter(c)) sb.append(c); } return sb.toString(); } /** * convert a string into a string holding only upper case letters * * @param inp input string * @return output string */ public static String regularizeString(String inp) { inp = inp.trim(); inp = inp.toUpperCase(); return dropNonLetters(inp); } /** * convert a string into strings of length maxLength all letters and * upper case */ public static class SubstringsMapFunction implements FlatMapFunctionString, String { private final int maxLength; public SubstringsMapFunction(final int pMaxLength) { maxLength = pMaxLength; } public IterableString call(String s) { s = regularizeString(s); // drop non-letters ListString holder = new ArrayListString(); for (int i = 0; i s.length() - 2; i += maxLength) { holder.add(s.substring(i, Math.min(s.length(), i + maxLength))); } return holder; } } /** * return all cases of an upper case string so THE - THE,ThE,THe,The,tHE,thE,tHe,the * In general the output is 2 to the Nth long where N is the input length */ public static class CaseVariationFunction implements FlatMapFunctionString, String { public IterableString call(String s) { // HELP - I don't think a List will work for long inputs given WHAT else can I use ListString holder = new ArrayListString(); // holds variants holder.add(s); makeVariations(s.toCharArray(), holder, 0); // do real work filling in holder return holder; } /** * add to holder - NOTE I think a List is wrong for large inputs * * @param chars characters input * @param holder - holder - or iterable holding results * @param index - start changing case at this position */ private void makeVariations(char[] chars, final ListString holder, int index) { if (index chars.length - 1) makeVariations(chars, holder, index + 1); if (Character.isUpperCase(chars[index])) { chars[index] = Character.toLowerCase(chars[index]); holder.add(new String(chars)); if (index chars.length - 1) makeVariations(chars, holder, index + 1); chars[index] = Character.toUpperCase(chars[index]); } } } // a few lines of text so we don't need to read a file is we don't want to public static final String GETTYSBURG = Four score and seven years ago our fathers brought forth, upon this
Re: Does anyone have experience with using Hadoop InputFormats?
I tried newAPIHadoopFile and it works except that my original InputFormat extends InputFormatText,Text and has a RecordReaderText,Text This throws a not Serializable exception on Text - changing the type to InputFormatStringBuffer, StringBuffer works with minor code changes. I do not, however, believe that Hadoop count use an InputFormat with types not derived from Writable - What were you using and was it able to work with Hadoop? On Tue, Sep 23, 2014 at 5:52 PM, Liquan Pei liquan...@gmail.com wrote: Hi Steve, Hi Steve, Did you try the newAPIHadoopFile? That worked for us. Thanks, Liquan On Tue, Sep 23, 2014 at 5:43 PM, Steve Lewis lordjoe2...@gmail.com wrote: Well I had one and tried that - my message tells what I found found 1) Spark only accepts org.apache.hadoop.mapred.InputFormatK,V not org.apache.hadoop.mapreduce.InputFormatK,V 2) Hadoop expects K and V to be Writables - I always use Text - Text is not Serializable and will not work with Spark - StringBuffer will work with Spark but not (as far as I know) with Hadoop - Telling me what the documentation SAYS is all well and good but I just tried it and want hear from people with real examples working On Tue, Sep 23, 2014 at 5:29 PM, Liquan Pei liquan...@gmail.com wrote: Hi Steve, Here is my understanding, as long as you implement InputFormat, you should be able to use hadoopFile API in SparkContext to create an RDD. Suppose you have a customized InputFormat which we call CustomizedInputFormatK, V where K is the key type and V is the value type. You can create an RDD with CustomizedInputFormat in the following way: Let sc denote the SparkContext variable and path denote the path to file of CustomizedInputFormat, we use val rdd;RDD[(K,V)] = sc.hadoopFile[K,V,CustomizedInputFormat](path, ClassOf[CustomizedInputFormat], ClassOf[K], ClassOf[V]) to create an RDD of (K,V) with CustomizedInputFormat. Hope this helps, Liquan On Tue, Sep 23, 2014 at 5:13 PM, Steve Lewis lordjoe2...@gmail.com wrote: When I experimented with using an InputFormat I had used in Hadoop for a long time in Hadoop I found 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat 2) initialize needs to be called in the constructor 3) The type - mine was extends FileInputFormatText, Text must not be a Hadoop Writable - those are not serializable but extends FileInputFormatStringBuffer, StringBuffer does work - I don't think this is allowed in Hadoop Are these statements correct and if so it seems like most Hadoop InputFormate - certainly the custom ones I create require serious modifications to work - does anyone have samples of use of Hadoop InputFormat Since I am working with problems where a directory with multiple files are processed and some files are many gigabytes in size with multiline complex records an input format is a requirement. -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com -- Liquan Pei Department of Physics University of Massachusetts Amherst -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Does anyone have experience with using Hadoop InputFormats?
Do your custom Writable classes implement Serializable - I think that is the only real issue - my code uses vanilla Text
Re: Does anyone have experience with using Hadoop InputFormats?
Hmmm - I have only tested in local mode but I got an java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1180) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1528) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1493) Here are two classes - one will work one will not the mgf file is what they read showPairRDD simply print the text read guaranteeSparkMaster calls sparkConf.setMaster(local); if there is no master defined Perhaps I need to convert Text somewhere else but I certainly don't see where package com.lordjoe.distributed.input; /** * com.lordjoe.distributed.input.MGFInputFormat * User: Steve * Date: 9/24/2014 */ import org.apache.hadoop.conf.*; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.*; import org.apache.hadoop.io.compress.*; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.*; import org.apache.hadoop.util.*; import java.io.*; /** * org.systemsbiology.hadoop.MGFInputFormat * Splitter that reads mgf files * nice enough to put the begin and end tags on separate lines */ public class MGFInputFormat extends FileInputFormatStringBuffer, StringBuffer implements Serializable { private String m_Extension = mgf; public MGFInputFormat() { } @SuppressWarnings(UnusedDeclaration) public String getExtension() { return m_Extension; } @SuppressWarnings(UnusedDeclaration) public void setExtension(final String pExtension) { m_Extension = pExtension; } @Override public RecordReaderStringBuffer, StringBuffer createRecordReader(InputSplit split, TaskAttemptContext context) { return new MGFFileReader(); } @Override protected boolean isSplitable(JobContext context, Path file) { final String lcName = file.getName().toLowerCase(); //noinspection RedundantIfStatement if (lcName.endsWith(gz)) return false; return true; } /** * Custom RecordReader which returns the entire file as a * single value with the name as a key * Value is the entire file * Key is the file name */ public class MGFFileReader extends RecordReaderStringBuffer, StringBuffer implements Serializable { private CompressionCodecFactory compressionCodecs = null; private long m_Start; private long m_End; private long current; private LineReader m_Input; FSDataInputStream m_RealFile; private StringBuffer key = null; private StringBuffer value = null; private Text buffer; // must be public Text getBuffer() { if(buffer == null) buffer = new Text(); return buffer; } public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException { FileSplit split = (FileSplit) genericSplit; Configuration job = context.getConfiguration(); m_Start = split.getStart(); m_End = m_Start + split.getLength(); final Path file = split.getPath(); compressionCodecs = new CompressionCodecFactory(job); boolean skipFirstLine = false; final CompressionCodec codec = compressionCodecs.getCodec(file); // open the file and seek to the m_Start of the split FileSystem fs = file.getFileSystem(job); // open the file and seek to the m_Start of the split m_RealFile = fs.open(split.getPath()); if (codec != null) { CompressionInputStream inputStream = codec.createInputStream(m_RealFile); m_Input = new LineReader( inputStream ); m_End = Long.MAX_VALUE; } else { if (m_Start != 0) { skipFirstLine = true; --m_Start; m_RealFile.seek(m_Start); } m_Input = new LineReader( m_RealFile); } // not at the beginning so go to first line if (skipFirstLine) { // skip first line and re-establish m_Start. m_Start += m_Input.readLine(getBuffer()) ; } current = m_Start; if (key == null) { key = new StringBuffer(); } else { key.setLength(0); } key.append(split.getPath().getName()); if (value == null) { value = new StringBuffer(); } current = 0; } /** * look for a scan tag then read until it closes * * @return true if there is data * @throws java.io.IOException */ public boolean nextKeyValue() throws
Has anyone seen java.nio.ByteBuffer.wrap(ByteBuffer.java:392)
java.lang.NullPointerException at java.nio.ByteBuffer.wrap(ByteBuffer.java:392) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58) at org.apache.spark.scheduler.Task.run(Task.scala:54) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:199) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) I am running on local master - performing a flatMap on an RDD which looks right if I collect it I never hit code in my map function and not a single line in the stack references something in my code base Anyone ever seen this or know what to do
Does anyone have experience with using Hadoop InputFormats?
When I experimented with using an InputFormat I had used in Hadoop for a long time in Hadoop I found 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat 2) initialize needs to be called in the constructor 3) The type - mine was extends FileInputFormatText, Text must not be a Hadoop Writable - those are not serializable but extends FileInputFormatStringBuffer, StringBuffer does work - I don't think this is allowed in Hadoop Are these statements correct and if so it seems like most Hadoop InputFormate - certainly the custom ones I create require serious modifications to work - does anyone have samples of use of Hadoop InputFormat Since I am working with problems where a directory with multiple files are processed and some files are many gigabytes in size with multiline complex records an input format is a requirement.
Is there any way (in Java) to make a JavaRDD from an iterable
The only way I find is to turn it into a list - in effect holding everything in memory (see code below). Surely Spark has a better way. Also what about unterminated iterables like a Fibonacci series - (useful only if limited in some other way ) /** * make an RDD from an iterable * @param inp input iterator * @param ctx context * @param T type * @return rdd from inerator as a list */ public static @Nonnull T JavaRDDT fromIterable(@Nonnull final IterableT inp,@Nonnull final JavaSparkContext ctx) { ListT holder = new ArrayListT(); for (T k : inp) { holder.add(k); } return ctx.parallelize(holder); }
Re: Reproducing the function of a Hadoop Reducer
OK so in Java - pardon the verbosity I might say something like the code below but I face the following issues 1) I need to store all values in memory as I run combineByKey - it I could return an RDD which consumed values that would be great but I don't know how to do that - 2) In my version of the functions I get a tuple so I know the key but all of Scala's functtions for byKey do not make the key available - this may work for a trivial function like wordcount but the code I want to port needs to know the key when processing values 3) it is important the I have control over partitioning - I can do that with mapPartition but it is also important that within a partition keys be received in sorted order - easy if every partition could a separate RDD - combined later but I am not sure how that works. in Hadoop then I reduce the values for each key I get an interator and do not need to keep all values in memory. Similarly while the output in Hadoop is written to disk as key values in Spark it could populate a JavaPairRDD if there were a way to do that lazily One other issue - I don't see a good way to say a merge function is finished - i.e. no further data is coming in which would be useful in processing steps. /** * a class to store a key and all its values * using an array list * @param K key type * @param V value type */ public static class KeyAndValuesK, V { public final K key; private final ArrayListV values = new ArrayListV(); public KeyAndValues(final K pKey) { key = pKey; } public void addValue(V added) { values.add(added); } public IterableV getIterable() { return values; } public KeyAndValuesK, V merge(KeyAndValuesK, V merged) { values.addAll(merged.values); return this; } } // start function for combine by key - gets key from first tuple public static class CombineStartKeyAndValuesK, V implements FunctionTuple2K,V, KeyAndValuesK, V { public KeyAndValues call(Tuple2K,V x) { KeyAndValues ret = new KeyAndValues(x._1()); ret.addValue(x._2()); return ret; } } // continue function for combine by key - adds values to array public static class CombineContinueKeyAndValuesK, V implements Function2 KeyAndValues K,V, Tuple2K,V, KeyAndValuesK, V { public KeyAndValuesK, V call(final KeyAndValuesK, V kvs, final Tuple2K,V added) throws Exception { kvs.addValue(added._2()); return kvs; } } // merge function - merges arrays - NOTE there is no signal to say merge is done public static class CombineMergeKeyAndValuesK, V implements Function2 KeyAndValuesK, V,KeyAndValuesK, V,KeyAndValuesK, V { public KeyAndValuesK, V call(final KeyAndValuesK, V v1, final KeyAndValuesK, V v2) throws Exception { return null; } } On Fri, Sep 19, 2014 at 11:19 PM, Victor Tso-Guillen v...@paxata.com wrote: So sorry about teasing you with the Scala. But the method is there in Java too, I just checked. On Fri, Sep 19, 2014 at 2:02 PM, Victor Tso-Guillen v...@paxata.com wrote: It might not be the same as a real hadoop reducer, but I think it would accomplish the same. Take a look at: import org.apache.spark.SparkContext._ // val rdd: RDD[(K, V)] // def zero(value: V): S // def reduce(agg: S, value: V): S // def merge(agg1: S, agg2: S): S val reducedUnsorted: RDD[(K, S)] = rdd.combineByKey[Int](zero, reduce, merge) reducedUnsorted.sortByKey() On Fri, Sep 19, 2014 at 1:37 PM, Steve Lewis lordjoe2...@gmail.com wrote: I am struggling to reproduce the functionality of a Hadoop reducer on Spark (in Java) in Hadoop I have a function public void doReduce(K key, IteratorV values) in Hadoop there is also a consumer (context write) which can be seen as consume(key,value) In my code 1) knowing the key is important to the function 2) there is neither one output tuple2 per key nor one output tuple2 per value 3) the number of values per key might be large enough that storing them in memory is impractical 4) keys must appear in sorted order one good example would run through a large document using a similarity function to look at the last 200 lines and output any of those with a similarity of more than 0.3 (do not suggest output all and filter - the real problem is more complex) the critical concern is an uncertain number of tuples per key. my questions 1) how can this be done - ideally a consumer would be a JavaPairRDD but I don't see how to create one and add items later 2) how do I handle the entire partition, sort, process (involving calls to doReduce) process -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Looking for a good sample of Using Spark to do things Hadoop can do
Assume I have a large book with many Chapters and many lines of text. Assume I have a function that tells me the similarity of two lines of text. The objective is to find the most similar line in the same chapter within 200 lines of the line found. The real problem involves biology and is beyond this discussion. In the code shown below I convert Lines with location into a Tuple2 where location is the key, Now I want to partition by chapter (I think maybe that is right) Now for every chapter I want to look at lines in order of location I want to keep the last 200 locations (as LineAndLocationMatch ) search them to update the best fit and for every line add a best fit. When a line is over 200 away from the current line it can be added ti the return JavaRDD. I know how to to the map and generate doubles but not how to do the sort and reduce or even what the reduce function arguments look like. Please use Java functions - not Lambdas as a sample- I am a strong typing guy - returning JavaRDDs show me the type for a series of . operations and really helps me understand what is happening I expect my reduceFunction to look like void reduceFunction(KeyClass key,IteratorLineAndLocation values) but to have some way to accept the best fit LineAndLocationMatch generated as values are iterated. There is no reason to think that the number of objects will fit in memory. Also it is important for the function doing the reduce to know the key. I am very lost at what the reduce look like. Under the covers reduce involves a lot of Java code which knows very little about spark and Hadoop. My pseudo code looke like this - as far as I have working // one line in the book static class LineAndLocation { int chapter; int lineNumber; String line; } // one line in the book static class LineAndLocationMatch { LineAndLocationMatch thisLine; LineAndLocationMatch bestFit; } // location - acts as a key static class KeyClass { int chapter; int lineNumber; KeyClass(final int pChapter, final int pLineNumber) { chapter = pChapter; lineNumber = pLineNumber; } } // used to compute the best fit public class SimilarityFunction { double getSimilarity(String s1,String s2) { return 0; // todo do work here } } // This functions returns a RDD with best macth objects public static JavaRDDLineAndLocationMatch findBestMatchesLikeHadoop(JavaRDDLineAndLocation inputs) { // So this is what the mapper does - make key value pairs JavaPairRDDKeyClass , LineAndLocation mappedKeys = inputs.mapToPair(new PairFunctionLineAndLocation, KeyClass, LineAndLocation() { @Override public Tuple2KeyClass , LineAndLocation call(final LineAndLocation v) throws Exception { return new Tuple2(new KeyClass(v.chapter,v.lineNumber),v); } }); // Partition by chapters ?? is this right?? mappedKeys = mappedKeys.partitionBy(new Partitioner() { @Override public int numPartitions() { return 20; } @Override public int getPartition(final Object key) { return ((KeyClass)key).chapter % numPartitions(); } }); // Now I get very fuzzy - I for every partition I want sort on line number JavaPairRDDKeyClass , LineAndLocation sortedKeys = ??? WHAT HAPPENS HERE // Now I need to to a reduce operation What I want is JavaRDDLineAndLocationMatch bestMatches = sortedKeys.SOME FUNCTION(); return bestMatches; }
Re: Is the structure for a jar file for running Spark applications the same as that for Hadoop
In modern projects there are a bazillion dependencies - when I use Hadoop I just put them in a lib directory in the jar - If I have a project that depends on 50 jars I need a way to deliver them to Spark - maybe wordcount can be written without dependencies but real projects need to deliver dependencies to the cluster On Wed, Sep 10, 2014 at 11:44 PM, Sean Owen so...@cloudera.com wrote: Hm, so it is: http://docs.oracle.com/javase/tutorial/deployment/jar/downman.html I'm sure I've done this before though and thought is was this mechanism. It must be something custom. What's the Hadoop jar structure in question then? Is it something special like a WAR file? I confess I had never heard of this so thought this was about generic JAR stuff. Is the question about a lib dir in the Hadoop home dir? On Sep 10, 2014 11:34 PM, Marcelo Vanzin van...@cloudera.com wrote: On Mon, Sep 8, 2014 at 11:15 PM, Sean Owen so...@cloudera.com wrote: This structure is not specific to Hadoop, but in theory works in any JAR file. You can put JARs in JARs and refer to them with Class-Path entries in META-INF/MANIFEST.MF. Funny that you mention that, since someone internally asked the same question, and I spend some time looking at it. That's not actually how Class-Path works in the manifest. You can't have jars inside other jars; the Class-Path items reference things in the filesystem itself. So that solution doesn't work. It would be nice to add the feature Steve is talking about, though. -- Marcelo -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Is the structure for a jar file for running Spark applications the same as that for Hadoop
In a Hadoop jar there is a directory called lib and all non-provided third party jars go there and are included in the class path of the code. Do jars for Spark have the same structure - another way to ask the question is if I have code to execute Spark and a jar build for Hadoop can I simply use that jar?
Re: Mapping Hadoop Reduce to Spark
Assume I define a partitioner like /** * partition on the first letter */ public class PartitionByStart extends Partitioner { @Override public int numPartitions() { return 26; } @Override public int getPartition(final Object key) { String s = (String)key; if(s.length() == 0) throw new IllegalStateException(problem); // ToDo change int ret = s.charAt(0) - 'A'; ret = Math.min(25,ret) ; ret = Math.max(0,ret); return 25 - ret; } } how, short or running on a large cluster can I test that code which might look like (Unrolling all the chained methods) ones = ones.partitionBy(new PartitionByStart()); JavaPairRDDString, Integer sorted = ones.sortByKey(); JavaRDDWordNumber answer = sorted.mapPartitions(new WordCountFlatMapFinction()); partitions properly - in other words on a local instance how would partitoning work and what do I expect to see in switching from one partition to another as the code runs? On Sat, Aug 30, 2014 at 10:30 AM, Matei Zaharia matei.zaha...@gmail.com wrote: In 1.1, you'll be able to get all of these properties using sortByKey, and then mapPartitions on top to iterate through the key-value pairs. Unfortunately sortByKey does not let you control the Partitioner, but it's fairly easy to write your own version that does if this is important. In previous versions, the values for each key had to fit in memory (though we could have data on disk across keys), and this is still true for groupByKey, cogroup and join. Those restrictions will hopefully go away in a later release. But sortByKey + mapPartitions lets you just iterate through the key-value pairs without worrying about this. Matei On August 30, 2014 at 9:04:37 AM, Steve Lewis (lordjoe2...@gmail.com) wrote: When programming in Hadoop it is possible to guarantee 1) All keys sent to a specific partition will be handled by the same machine (thread) 2) All keys received by a specific machine (thread) will be received in sorted order 3) These conditions will hold even if the values associated with a specific key are too large enough to fit in memory. In my Hadoop code I use all of these conditions - specifically with my larger data sets the size of data I wish to group exceeds the available memory. I think I understand the operation of groupby but my understanding is that this requires that the results for a single key, and perhaps all keys fit on a single machine. Is there away to perform like Hadoop ad not require that an entire group fir in memory? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
I am looking for a Java sample of a Partitioner
Assume say JavaWord count I call the equivalent of a Mapper JavaPairRDDString, Integer ones = words.mapToPair(,,, Now right here I want to guarantee that each word starting with a particular letter is processed in a specific partition - (Don't tell me this is a dumb idea - I know that but in a Hadoop code a custom partitioner is often important and I don't want to explain the real case) I have no idea how ones would implement mapToPartition but I want emulate Hadoop with a custom partition and keySort order JavaPairRDDString, Integer counts = ones.reduceByKey(...
I am looking for a Java sample of a Partitioner
Assume say JavaWord count I call the equivalent of a Mapper JavaPairRDDString, Integer ones = words.mapToPair(,,, Now right here I want to guarantee that each word starting with a particular letter is processed in a specific partition - (Don't tell me this is a dumb idea - I know that but in a Hadoop code a custom partitioner is often important and I don't want to explain the real case) I have no idea how ones would implement such partitioning here or what code would look like assuming mapToPartition was used JavaPairRDDString, Integer counts = ones.reduceByKey(...
Re: Mapping Hadoop Reduce to Spark
Is there a sample of how to do this - I see 1.1 is out but cannot find samples of mapPartitions A Java sample would be very useful On Sat, Aug 30, 2014 at 10:30 AM, Matei Zaharia matei.zaha...@gmail.com wrote: In 1.1, you'll be able to get all of these properties using sortByKey, and then mapPartitions on top to iterate through the key-value pairs. Unfortunately sortByKey does not let you control the Partitioner, but it's fairly easy to write your own version that does if this is important. In previous versions, the values for each key had to fit in memory (though we could have data on disk across keys), and this is still true for groupByKey, cogroup and join. Those restrictions will hopefully go away in a later release. But sortByKey + mapPartitions lets you just iterate through the key-value pairs without worrying about this. Matei On August 30, 2014 at 9:04:37 AM, Steve Lewis (lordjoe2...@gmail.com) wrote: When programming in Hadoop it is possible to guarantee 1) All keys sent to a specific partition will be handled by the same machine (thread) 2) All keys received by a specific machine (thread) will be received in sorted order 3) These conditions will hold even if the values associated with a specific key are too large enough to fit in memory. In my Hadoop code I use all of these conditions - specifically with my larger data sets the size of data I wish to group exceeds the available memory. I think I understand the operation of groupby but my understanding is that this requires that the results for a single key, and perhaps all keys fit on a single machine. Is there away to perform like Hadoop ad not require that an entire group fir in memory? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Mapping Hadoop Reduce to Spark
When programming in Hadoop it is possible to guarantee 1) All keys sent to a specific partition will be handled by the same machine (thread) 2) All keys received by a specific machine (thread) will be received in sorted order 3) These conditions will hold even if the values associated with a specific key are too large enough to fit in memory. In my Hadoop code I use all of these conditions - specifically with my larger data sets the size of data I wish to group exceeds the available memory. I think I understand the operation of groupby but my understanding is that this requires that the results for a single key, and perhaps all keys fit on a single machine. Is there away to perform like Hadoop ad not require that an entire group fir in memory?
What happens if I have a function like a PairFunction but which might return multiple values
In many cases when I work with Map Reduce my mapper or my reducer might take a single value and map it to multiple keys - The reducer might also take a single key and emit multiple values I don't think that functions like flatMap and reduceByKey will work or are there tricks I am not aware of
How do you hit breakpoints using IntelliJ In functions used by an RDD
I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions??
Re: How do you hit breakpoints using IntelliJ In functions used by an RDD
That was not quite in English My Flatmap code is shown below I know the code is called since the answers are correct but would like to put a break point in dropNonLetters to make sure that code works properly I am running in the IntelliJ debugger but believe the code is executing on a Spark Worker. I am not sure what magic Intellij uses to hook up a debugger to a worker but hope it is possib;e public class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = regularizeString(split[i]); } return Arrays.asList(split); } public static String dropNonLetters(String s) { StringBuilder sb = new StringBuilder(); for (int i = 0; i s.length(); i++) { char c = s.charAt(i); if (Character.isLetter(c)) sb.append(c); } return sb.toString(); } public static String regularizeString(String inp) { inp = inp.trim(); inp = inp.toUpperCase(); return dropNonLetters(inp); } } On Mon, Aug 25, 2014 at 10:35 AM, Sean Owen so...@cloudera.com wrote: flatMap() is a transformation only. Calling it by itself does nothing, and it just describes the relationship between one RDD and another. You should see it swing into action if you invoke an action, like count(), on the words RDD. On Mon, Aug 25, 2014 at 6:32 PM, Steve Lewis lordjoe2...@gmail.com wrote: I was able to get JavaWordCount running with a local instance under IntelliJ. In order to do so I needed to use maven to package my code and call String[] jars = { /SparkExamples/target/word-count-examples_2.10-1.0.0.jar }; sparkConf.setJars(jars); After that the sample ran properly and in the debugger I could set break points in the main. However when I do something like JavaRDDString words = lines.flatMap( new WordsMapFunction()); where WordsMapFunction is a separate class like public static class WordsMapFunction implements FlatMapFunctionString, String { private static final Pattern SPACE = Pattern.compile( ); public IterableString call(String s) { String[] split = SPACE.split(s); for (int i = 0; i split.length; i++) { split[i] = toUpperCase(split[i]); } return Arrays.asList(split); } } Breakpoints set in WordsMapFunction are never hit. Most interesting functionality in the problems I am trying to solve if in the FlatMapFunction and the Function2 code and this is the functionality I will need to examine in more detail. Has anyone figured out how to configure a project to hit breakpoints in these functions?? -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
I am struggling to run Spark Examples on my local machine
I download the binaries for spark-1.0.2-hadoop1 and unpack it on my Widows 8 box. I can execute spark-shell.com and get a command window which does the proper things I open a browser to http:/localhost:4040 and a window comes up describing the spark-master Then using IntelliJ I create a project with JavaWordCount from the spark distribution. add When I run the job with the -Dspark.master=spark://local[*]:7707 (I have tried MANY other string) the Job fails for failure to connect to the spark master. So my question is 1) Do I have a spark-master running? How can I tell? doesn't the web page say it is running 2) How to I find the port on which the master is running and test that it is accepting jobs 3) Are there other steps I need to take before I can run a simple spark sample? 14/08/21 09:27:08 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 14/08/21 09:27:23 WARN scheduler.TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory ... 14/08/21 09:28:08 ERROR cluster.SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 14/08/21 09:28:08 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 14/08/21 09:28:08 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1 14/08/21 09:28:08 INFO scheduler.DAGScheduler: Failed to run collect at JavaWordCount.java:68 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: All masters are unresponsive! Giving up.
Re: Does anyone have a stand alone spark instance running on Windows
I have made a little progress - by downloading a prebuilt version of Spark I can call spark-shell.cmd and bring up a spark shell. In the shell things run. Next I go to my development environment and try to run JavaWordCount i try -Dspark.master=spark://local[*]:55519 -Dspark.master=spark://Asterix:7707 (Asterix is my machine) end many other combinations I can hit a web page http://asterix:4040/environment/ and see many details about a presumably running spark master but the incantation to allow a simple job like JavaWordCount is escaping me Oh yes - I am running on Windows 8 Any help would be appreciated starting with how do I know a spark master is running and what port it is on On Sat, Aug 16, 2014 at 7:33 PM, Manu Suryavansh suryavanshi.m...@gmail.com wrote: Hi, I have built spark-1.0.0 on Windows using Java 7/8 and I have been able to run several examples - here are my notes - http://ml-nlp-ir.blogspot.com/2014/04/building-spark-on-windows-and-cloudera.html on how to build from source and run examples in spark shell. Regards, Manu On Sat, Aug 16, 2014 at 12:14 PM, Steve Lewis lordjoe2...@gmail.com wrote: I want to look at porting a Hadoop problem to Spark - eventually I want to run on a Hadoop 2.0 cluster but while I am learning and porting I want to run small problems in my windows box. I installed scala and sbt. I download Spark and in the spark directory can say mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package which succeeds I tried sbt/sbt assembly which fails with errors In the documentation https://spark.apache.org/docs/latest/spark-standalone.htmlit says *Note:* The launch scripts do not currently support Windows. To run a Spark cluster on Windows, start the master and workers by hand. with no indication of how to do this. I can build and run samples (say JavaWordCount) to the point where they fail because a master cannot be found (none is running) I want to know how to get a spark master and a slave or two running on my windows box so I can look at the samples and start playing with Spark Does anyone have a windows instance running?? Please DON'T SAY I SHOULD RUN LINUX! if it is supposed to work on windows someone should have tested it and be willing to state how. -- Manu Suryavansh -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Re: Does anyone have a stand alone spark instance running on Windows
OK I tried your build - First you need to put spt in C:\sbt Then you get Microsoft Windows [Version 6.2.9200] (c) 2012 Microsoft Corporation. All rights reserved. e:\which java /cygdrive/c/Program Files/Java/jdk1.6.0_25/bin/java e:\java -version java version 1.6.0_25 Java(TM) SE Runtime Environment (build 1.6.0_25-b06) e:\sparksbt_opt.bat e:\sparkset SCRIPT_DIR=C:\sbt\ e:\sparkjava -Xms512m -Xmx2g -Xss1M -XX:+CMSClassUnloadingEnabled -XX:MaxPermSize=128m -jar C:\sbt\sbt-launch.jar [ERROR] Terminal initialization failed; falling back to unsupported java.lang.IncompatibleClassChangeError: Found class jline.Terminal, but interface was expected at jline.TerminalFactory.create(TerminalFactory.java:101) at jline.TerminalFactory.get(TerminalFactory.java:159) at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:86) at sbt.ConsoleLogger$.init(ConsoleLogger.scala:80) at sbt.ConsoleLogger$.clinit(ConsoleLogger.scala) at sbt.GlobalLogging$.initial(GlobalLogging.scala:40) at sbt.StandardMain$.initialGlobalLogging(Main.scala:64) at sbt.StandardMain$.initialState(Main.scala:73) at sbt.xMain.run(Main.scala:29) at xsbt.boot.Launch$.run(Launch.scala:55) at xsbt.boot.Launch$$anonfun$explicit$1.apply(Launch.scala:45) at xsbt.boot.Launch$.launch(Launch.scala:69) at xsbt.boot.Launch$.apply(Launch.scala:16) at xsbt.boot.Boot$.runImpl(Boot.scala:31) at xsbt.boot.Boot$.main(Boot.scala:20) at xsbt.boot.Boot.main(Boot.scala) java.lang.IncompatibleClassChangeError: JLine incompatibility detected. Check that the sbt launcher is version 0.13.x or later. at sbt.ConsoleLogger$.ansiSupported(ConsoleLogger.scala:97) at sbt.ConsoleLogger$.init(ConsoleLogger.scala:80) at sbt.ConsoleLogger$.clinit(ConsoleLogger.scala) at sbt.GlobalLogging$.initial(GlobalLogging.scala:40) at sbt.StandardMain$.initialGlobalLogging(Main.scala:64) at sbt.StandardMain$.initialState(Main.scala:73) at sbt.xMain.run(Main.scala:29) at xsbt.boot.Launch$.run(Launch.scala:55) at xsbt.boot.Launch$$anonfun$explicit$1.apply(Launch.scala:45) at xsbt.boot.Launch$.launch(Launch.scala:69) at xsbt.boot.Launch$.apply(Launch.scala:16) at xsbt.boot.Boot$.runImpl(Boot.scala:31) at xsbt.boot.Boot$.main(Boot.scala:20) at xsbt.boot.Boot.main(Boot.scala) Error during sbt execution: java.lang.IncompatibleClassChangeError: JLine incompatibility detected. Check that the sbt launcher is version 0.13.x or later. I believe my version of sbt is -.0.13 Finally even if I could build Spark I still don't see how to launch a server On Sat, Aug 16, 2014 at 7:33 PM, Manu Suryavansh suryavanshi.m...@gmail.com wrote: Hi, I have built spark-1.0.0 on Windows using Java 7/8 and I have been able to run several examples - here are my notes - http://ml-nlp-ir.blogspot.com/2014/04/building-spark-on-windows-and-cloudera.html on how to build from source and run examples in spark shell. Regards, Manu On Sat, Aug 16, 2014 at 12:14 PM, Steve Lewis lordjoe2...@gmail.com wrote: I want to look at porting a Hadoop problem to Spark - eventually I want to run on a Hadoop 2.0 cluster but while I am learning and porting I want to run small problems in my windows box. I installed scala and sbt. I download Spark and in the spark directory can say mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package which succeeds I tried sbt/sbt assembly which fails with errors In the documentation https://spark.apache.org/docs/latest/spark-standalone.htmlit says *Note:* The launch scripts do not currently support Windows. To run a Spark cluster on Windows, start the master and workers by hand. with no indication of how to do this. I can build and run samples (say JavaWordCount) to the point where they fail because a master cannot be found (none is running) I want to know how to get a spark master and a slave or two running on my windows box so I can look at the samples and start playing with Spark Does anyone have a windows instance running?? Please DON'T SAY I SHOULD RUN LINUX! if it is supposed to work on windows someone should have tested it and be willing to state how. -- Manu Suryavansh -- Steven M. Lewis PhD 4221 105th Ave NE Kirkland, WA 98033 206-384-1340 (cell) Skype lordjoe_com
Does anyone have a stand alone spark instance running on Windows
I want to look at porting a Hadoop problem to Spark - eventually I want to run on a Hadoop 2.0 cluster but while I am learning and porting I want to run small problems in my windows box. I installed scala and sbt. I download Spark and in the spark directory can say mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package which succeeds I tried sbt/sbt assembly which fails with errors In the documentation https://spark.apache.org/docs/latest/spark-standalone.htmlit says *Note:* The launch scripts do not currently support Windows. To run a Spark cluster on Windows, start the master and workers by hand. with no indication of how to do this. I can build and run samples (say JavaWordCount) to the point where they fail because a master cannot be found (none is running) I want to know how to get a spark master and a slave or two running on my windows box so I can look at the samples and start playing with Spark Does anyone have a windows instance running?? Please DON'T SAY I SHOULD RUN LINUX! if it is supposed to work on windows someone should have tested it and be willing to state how.