Re: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore
This discussion belongs on the dev list. Please post any replies there. On Sat, May 23, 2015 at 10:19 PM, Cheolsoo Park piaozhe...@gmail.com wrote: Hi, I've been testing SparkSQL in 1.4 rc and found two issues. I wanted to confirm whether these are bugs or not before opening a jira. *1)* I can no longer compile SparkSQL with -Phive-0.12.0. I noticed that in 1.4, IsolatedClientLoader is introduced, and different versions of Hive metastore jars can be loaded at runtime. But instead, SparkSQL no longer compiles with Hive 0.12.0. My question is, is this intended? If so, shouldn't the hive-0.12.0 profile in POM be removed? *2)* After compiling SparkSQL with -Phive-0.13.1, I ran into my 2nd problem. Since I have Hive 0.12 metastore in production, I have to use it for now. But even if I set spark.sql.hive.metastore.version and spark.sql.hive.metastore.jars, SparkSQL cli throws an error as follows- 15/05/24 05:03:29 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect. org.apache.thrift.TApplicationException: Invalid method name: 'get_functions' at org.apache.thrift.TApplicationException.read(TApplicationException.java:108) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_functions(ThriftHiveMetastore.java:2886) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_functions(ThriftHiveMetastore.java:2872) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunctions(HiveMetaStoreClient.java:1727) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy12.getFunctions(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getFunctions(Hive.java:2670) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:674) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:662) at org.apache.hadoop.hive.cli.CliDriver.getCommandCompletor(CliDriver.java:540) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:175) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) What's happening is that when SparkSQL Cli starts up, it tries to fetch permanent udfs from Hive metastore (due to HIVE-6330 https://issues.apache.org/jira/browse/HIVE-6330, which was introduced in Hive 0.13). But then, it ends up invoking an incompatible thrift function that doesn't exist in Hive 0.12. To work around this error, I have to comment out the following line of code for now- https://goo.gl/wcfnH1 My question is, is SparkSQL that is compiled against Hive 0.13 supposed to work with Hive 0.12 metastore (by setting spark.sql.hive.metastore.version and spark.sql.hive.metastore.jars)? It only works if I comment out the above line of code. Thanks, Cheolsoo
how to distributed run a bash shell in spark
hello there I am trying to run a app in which part of it needs to run a shell.how to run a shell distributed in spark cluster.thanks. here's my code:import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ShellCompare { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory, 6g); JavaSparkContext sc = new JavaSparkContext(conf); for(int i=1;i=21;i++){ execShell(i); } //execShell(1); sc.stop(); } private static void execShell(int i) { String shpath=/opt/sh/bin/sort.sh; Process process =null; String var=/opt/data/shellcompare/chr + i +.txt /opt/data/shellcompare/samplechr + i +.txt /opt/data/shellcompare/result.txt 600; //String var=/opt/data/chr1.txt /opt/data/chr1sample.txt /opt/sh/bin/result.txt 600; String command2 = sh + shpath + + var; try { process = Runtime.getRuntime().exec(command2); process.waitFor(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } Thanksamp;Best regards! San.Luo
Spark dramatically slow when I add saveAsTextFile
*Problem Description*: The program running in stand-alone spark cluster (1 master, 6 workers with 8g ram and 2 cores). Input: a 468MB file with 133433 records stored in HDFS. Output: just 2MB file will stored in HDFS The program has two map operations and one reduceByKey operation. Finally I save the result to HDFS using *saveAsTextFile*. *Problem*: if I don't add saveAsTextFile, the program runs very fast(a few seconds), otherwise extremely slow until about 30 mins. *My program (is very Simple)* public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; String outputPath = hdfs://HadoopV26Master:9000/user/sparkoutput/; final int row = 133433; final int col = 458; final double dc = Double.valueOf(args[0]); SparkConf conf = new SparkConf(). setAppName(distance) .set(spark.executor.memory, 4g).set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.eventLog.enabled, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable, the dimension of this double array: 133433*458 final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDInteger,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Integer, Double(){ public IterableTuple2lt;Integer, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.value(); double sum = 0; Listlt;Tuple2lt;Integer, Double list = new ArrayListTuple2lt;Integer, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) )); } return list; } }); //Create zeroOne density JavaPairRDDInteger, Integer densityZeroOne = distance.mapValues(new FunctionDouble, Integer(){ public Integer call(Double v1) throws Exception { if(v1dc) return 1; else return 0; } }); // //Combine the density JavaPairRDDlt;Integer, Integer counts = densityZeroOne.reduceByKey(new Function2Integer, Integer,Integer() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); counts.*saveAsTextFile*(outputPath+args[1]); sc.stop(); } *If I comment saveAsTextFile, log will be:* Picked up _JAVA_OPTIONS: -Xmx4g 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/05/24 15:21:30 INFO spark.SecurityManager: Changing view acls to: hduser 15/05/24 15:21:30 INFO spark.SecurityManager: Changing modify acls to: hduser 15/05/24 15:21:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hduser); users with modify permissions: Set(hduser) 15/05/24 15:21:31 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/24 15:21:31 INFO Remoting:
Re: Strange ClassNotFound exeption
Can you pastebin the class path ? Thanks On May 24, 2015, at 5:02 AM, boci boci.b...@gmail.com wrote: Yeah, I have same jar with same result, I run in docker container and I using same docker container with my another project... the only difference is the postgresql jdbc driver and the custom RDD... no additional dependencies (both single jar generated with same assembly configuration with same dependencies) and the second is work like a charm Another idea? 2015. máj. 24. 2:41 ezt írta (Ted Yu yuzhih...@gmail.com): In my local maven repo, I found: $ jar tvf /Users/tyu/.m2/repository//org/spark-project/akka/akka-actor_2.10/2.3.4-spark/akka-actor_2.10-2.3.4-spark.jar | grep SelectionPath 521 Mon Sep 29 12:05:36 PDT 2014 akka/actor/SelectionPathElement.class Is the above jar in your classpath ? On Sat, May 23, 2015 at 5:05 PM, boci boci.b...@gmail.com wrote: Hi guys! I have a small spark application. It's query some data from postgres, enrich it and write to elasticsearch. When I deployed into spark container I got a very fustrating error: https://gist.github.com/b0c1/66527e00bada1e4c0dc3 Spark version: 1.3.1 Hadoop version: 2.6.0 Additional info: serialization: kryo rdd: custom rdd to query I not understand 1. akka.actor.SelectionPath doesn't exists in 1.3.1 2. I checked all dependencies in my project, I only have org.spark-project.akka:akka-*_2.10:2.3.4-spark:jar doesn't have 3. I not found any reference for this... 4. I created own RDD, it's work, but I need to register to kryo? (mapRow using ResultSet, I need to create 5. I used some months ago and it's already worked with spark 1.2... I recompiled with 1.3.1 but I got this strange error Any idea? -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Strange ClassNotFound exeption
Yeah, I have same jar with same result, I run in docker container and I using same docker container with my another project... the only difference is the postgresql jdbc driver and the custom RDD... no additional dependencies (both single jar generated with same assembly configuration with same dependencies) and the second is work like a charm Another idea? 2015. máj. 24. 2:41 ezt írta (Ted Yu yuzhih...@gmail.com): In my local maven repo, I found: $ jar tvf /Users/tyu/.m2/repository//org/spark-project/akka/akka-actor_2.10/2.3.4-spark/akka-actor_2.10-2.3.4-spark.jar | grep SelectionPath 521 Mon Sep 29 12:05:36 PDT 2014 akka/actor/SelectionPathElement.class Is the above jar in your classpath ? On Sat, May 23, 2015 at 5:05 PM, boci boci.b...@gmail.com wrote: Hi guys! I have a small spark application. It's query some data from postgres, enrich it and write to elasticsearch. When I deployed into spark container I got a very fustrating error: https://gist.github.com/b0c1/66527e00bada1e4c0dc3 Spark version: 1.3.1 Hadoop version: 2.6.0 Additional info: serialization: kryo rdd: custom rdd to query I not understand 1. akka.actor.SelectionPath doesn't exists in 1.3.1 2. I checked all dependencies in my project, I only have org.spark-project.akka:akka-*_2.10:2.3.4-spark:jar doesn't have 3. I not found any reference for this... 4. I created own RDD, it's work, but I need to register to kryo? (mapRow using ResultSet, I need to create 5. I used some months ago and it's already worked with spark 1.2... I recompiled with 1.3.1 but I got this strange error Any idea? -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Spark Streaming - Design considerations/Knobs
Really good list to brush up basics. Just one input, regarding * An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. We can have multiple jobs running in a given application at a point of time, if they are submitted from different threads. So essentially in a single threaded application, the above statement holds true. Regards, Sam On May 24, 2015, at 1:14 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Blocks are replicated immediately, before the driver launches any jobs using them. On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat hemant9...@gmail.commailto:hemant9...@gmail.com wrote: Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.commailto:hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? * A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. * A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. * The receivers are allocated to executors in a round robin fashion. * When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. * These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. * A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). * Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. * Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. * An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. * If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. * To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } } // TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch * * If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling
Re: Spark dramatically slow when I add saveAsTextFile
This may sound like an obvious question, but are you sure that the program is doing any work when you don't have a saveAsTextFile? If there are transformations but no actions to actually collect the data, there's no need for Spark to execute the transformations. As to the question of 'is this taking too long', I can't answer that. But your code was HTML escaped and therefore difficult to read, perhaps you should post a link to a Gist. Joe On 24 May 2015 at 10:36, allanjie allanmcgr...@gmail.com wrote: *Problem Description*: The program running in stand-alone spark cluster (1 master, 6 workers with 8g ram and 2 cores). Input: a 468MB file with 133433 records stored in HDFS. Output: just 2MB file will stored in HDFS The program has two map operations and one reduceByKey operation. Finally I save the result to HDFS using *saveAsTextFile*. *Problem*: if I don't add saveAsTextFile, the program runs very fast(a few seconds), otherwise extremely slow until about 30 mins. *My program (is very Simple)* public static void main(String[] args) throws IOException{ /**Parameter Setting***/ String localPointPath = /home/hduser/skyrock/skyrockImageFeatures.csv; String remoteFilePath = hdfs://HadoopV26Master:9000/user/skyrock/skyrockImageIndexedFeatures.csv; String outputPath = hdfs://HadoopV26Master:9000/user/sparkoutput/; final int row = 133433; final int col = 458; final double dc = Double.valueOf(args[0]); SparkConf conf = new SparkConf(). setAppName(distance) .set(spark.executor.memory, 4g).set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.eventLog.enabled, true); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDDString textFile = sc.textFile(remoteFilePath); //Broadcast variable, the dimension of this double array: 133433*458 final Broadcastdouble[][] broadcastPoints = sc.broadcast(createBroadcastPoints(localPointPath,row,col)); /** * Compute the distance in terms of each point on each instance. * distance list: index = n(i-1)- i*(i-1)/2 + j-i-1 */ JavaPairRDDInteger,Double distance = textFile.flatMapToPair(new PairFlatMapFunctionString, Integer, Double(){ public IterableTuple2lt;Integer, Double call(String v1) throws Exception{ ListString al = Arrays.asList(v1.split(,)); double[] featureVals = new double[al.size()]; for(int j=0;jal.size()-1;j++) featureVals[j] = Double.valueOf(al.get(j+1)); int jIndex = Integer.valueOf(al.get(0)); double[][] allPoints = broadcastPoints.value(); double sum = 0; Listlt;Tuple2lt;Integer, Double list = new ArrayListTuple2lt;Integer, Double(); for(int i=0;irow; i++){ sum = 0; for(int j=0;jlt;al.size()-1;j++){ sum += (allPoints[i][j]-featureVals[j])*(allPoints[i][j]-featureVals[j]); } list.add(new Tuple2lt;Integer,Double(jIndex, Math.sqrt(sum) )); } return list; } }); //Create zeroOne density JavaPairRDDInteger, Integer densityZeroOne = distance.mapValues(new FunctionDouble, Integer(){ public Integer call(Double v1) throws Exception { if(v1dc) return 1; else return 0; } }); // //Combine the density JavaPairRDDlt;Integer, Integer counts = densityZeroOne.reduceByKey(new Function2Integer, Integer,Integer() { public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); counts.*saveAsTextFile*(outputPath+args[1]); sc.stop(); } *If I comment saveAsTextFile, log will be:* Picked up _JAVA_OPTIONS: -Xmx4g 15/05/24 15:21:30 INFO spark.SparkContext: Running Spark version 1.3.1 15/05/24 15:21:30 WARN util.NativeCodeLoader: Unable to load
Re: how to distributed run a bash shell in spark
You mean you want to execute some shell commands from spark? Here's something i tried a while back. https://github.com/akhld/spark-exploit Thanks Best Regards On Sun, May 24, 2015 at 4:53 PM, luohui20...@sina.com wrote: hello there I am trying to run a app in which part of it needs to run a shell.how to run a shell distributed in spark cluster.thanks. here's my code: import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ShellCompare { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory, 6g); JavaSparkContext sc = new JavaSparkContext(conf); for(int i=1;i=21;i++){ execShell(i); } //execShell(1); sc.stop(); } private static void execShell(int i) { String shpath=/opt/sh/bin/sort.sh; Process process =null; String var=/opt/data/shellcompare/chr + i +.txt /opt/data/shellcompare/samplechr + i +.txt /opt/data/shellcompare/result.txt 600; //String var=/opt/data/chr1.txt /opt/data/chr1sample.txt /opt/sh/bin/result.txt 600; String command2 = sh + shpath + + var; try { process = Runtime.getRuntime().exec(command2); process.waitFor(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } Thanksamp;Best regards! San.Luo
Re: Trying to connect to many topics with several DirectConnect
I used to hit a NPE when i don't add all the dependency jars to my context while running it in standalone mode. Can you try adding all these dependencies to your context? sc.addJar(/home/akhld/.ivy2/cache/org.apache.spark/spark-streaming-kafka_2.10/jars/spark-streaming-kafka_2.10-1.3.1.jar) sc.addJar(/home/akhld/.ivy2/cache/org.apache.kafka/kafka_2.10/jars/kafka_2.10-0.8.1.1.jar) sc.addJar(/home/akhld/.ivy2/cache/com.101tec/zkclient/jars/zkclient-0.3.jar) sc.addJar(/home/akhld/.ivy2/cache/com.yammer.metrics/metrics-core/jars/metrics-core-2.2.0.jar) Thanks Best Regards On Fri, May 22, 2015 at 5:20 PM, Guillermo Ortiz konstt2...@gmail.com wrote: Hi, I'm trying to connect to two topics of Kafka with Spark with DirectStream but I get an error. I don't know if there're any limitation to do it, because when I just access to one topics everything if right. *val ssc = new StreamingContext(sparkConf, Seconds(5))* *val kafkaParams = Map[String, String](metadata.broker.list - quickstart.cloudera:9092)* *val setTopic1 = Set(topic1)* *val setTopic2 = Set(topic2)* *val stream1 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic1)* *val stream2 = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, setTopic2)* The error that I get is: * 15/05/22 13:12:40 INFO BlockManager: Removing RDD 314* *15/05/22 13:12:40 ERROR OneForOneStrategy: * *java.lang.NullPointerException* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at org.apache.spark.streaming.DStreamGraph$$anonfun$getMaxInputStreamRememberDuration$2.apply(DStreamGraph.scala:168)* * at scala.collection.TraversableOnce$$anonfun$maxBy$1.apply(TraversableOnce.scala:225)* * at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)* * at scala.collection.IndexedSeqOptimized$class.reduceLeft(IndexedSeqOptimized.scala:68)* * at scala.collection.mutable.ArrayBuffer.reduceLeft(ArrayBuffer.scala:47)* Are there any limitation to do it?
Help optimizing some spark code
Hi, I'm running this piece of code in my program: smallRdd.join(largeRdd) .groupBy { case (id, (_, X(a, _, _))) = a } .map { case (a, iterable) = a- iterable.size } .sortBy({ case (_, count) = count }, ascending = false) .take(k) where basically smallRdd is an rdd of (Long, Unit) and it has thousands of entries in it, (it was an rdd of longs but i needed a tuple for the join) largeRdd is an rdd of (Long, X) where X is a case class containing 3 Longs, and it has millions of entries in it. /both rdds have already been sorted by key/ what i want is, out of the intersection between the rdds (by key which is the first Long in the tuple) find the top k which have the most appearances of the same first value in X (a in this example). This code works but takes way too long, it can take up to 10 minutes on rdds of sizes 20,000 and 8,000,000, i've been playing around with commenting out different lines in the process and running it and i can't seem to find a clear bottleneck, each line seems to be quite costly. I am wondering if anyone can think of a better way to do this, * especially wondering if i should use IndexedRDD for the join, would it significantly improve the join performance ? * i really don't like the fact that i'm sorting thousands of entries just to get the top k, where usually k smallRdd.count, is there some kind of select kth i can do (and then just filter the bigger elements) on rdds ? or any other way to improve what's happening here ? thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Help-optimizing-some-spark-code-tp23006.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Powered by Spark listing
Information Innovators, Inc. http://www.iiinfo.com/ Spark, Spark Streaming, Spark SQL, MLLib Developing data analytics systems for federal healthcare, national defense and other programs using Spark on YARN. -- This page tracks the users of Spark. To add yourself to the list, please email user@spark.apache.org with your organization name, URL, a list of which Spark components you are using, and a short description of your use case.
Re: How to use zookeeper in Spark Streaming
I think the Zookeeper watcher code should reside in task code. Haven't found guide on this subject so far. Cheers On Sun, May 24, 2015 at 7:15 PM, bit1...@163.com bit1...@163.com wrote: Can someone please help me on this? -- bit1...@163.com *发件人:* bit1...@163.com *发送时间:* 2015-05-24 13:53 *收件人:* user user@spark.apache.org *主题:* How to use zookeeper in Spark Streaming Hi, In my spark streaming application, when the application starts and get running, the Tasks running on the Worker nodes need to be notified that some configurations have been changed from time to time, these configurations reside on the Zookeeper. My question is, where should I put the code that works with Zookeeper for the configuration change, in the Driver code or in the Task code? Is there some guide on this? Thanks. -- bit1...@163.com
回复:Re: how to distributed run a bash shell in spark
Thanks Akhil, your code is a big help to me,'cause perl script is the exactly thing i wanna try to run in spark. I will have a try. Thanksamp;Best regards! San.Luo - 原始邮件 - 发件人:Akhil Das ak...@sigmoidanalytics.com 收件人:罗辉 luohui20...@sina.com 抄送人:user user@spark.apache.org 主题:Re: how to distributed run a bash shell in spark 日期:2015年05月25日 00点53分 You mean you want to execute some shell commands from spark? Here's something i tried a while back. https://github.com/akhld/spark-exploitThanksBest Regards On Sun, May 24, 2015 at 4:53 PM, luohui20...@sina.com wrote: hello there I am trying to run a app in which part of it needs to run a shell.how to run a shell distributed in spark cluster.thanks. here's my code:import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataType; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; public class ShellCompare { public static void main(String[] args) { // TODO Auto-generated method stub SparkConf conf = new SparkConf().setAppName(ShellCompare).setMaster(spark://master:7077).set(spark.executor.memory, 6g); JavaSparkContext sc = new JavaSparkContext(conf); for(int i=1;i=21;i++){ execShell(i); } //execShell(1); sc.stop(); } private static void execShell(int i) { String shpath=/opt/sh/bin/sort.sh; Process process =null; String var=/opt/data/shellcompare/chr + i +.txt /opt/data/shellcompare/samplechr + i +.txt /opt/data/shellcompare/result.txt 600; //String var=/opt/data/chr1.txt /opt/data/chr1sample.txt /opt/sh/bin/result.txt 600; String command2 = sh + shpath + + var; try { process = Runtime.getRuntime().exec(command2); process.waitFor(); } catch (InterruptedException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } Thanksamp;Best regards! San.Luo
Using Spark like a search engine
HI! We are developing scoring system for recruitment. Recruiter enters vacancy requirements, and we score tens of thousands of CVs to this requirements, and return e.g. top 10 matches. We do not use fulltext search and sometimes even dont filter input CVs prior to scoring (some vacancies do not have mandatory requirements that can be used as a filter effectively). So we have scoring function F(CV,VACANCY) that is currently inplemented in SQL and runs on Postgresql cluster. In worst case F is executed once on every CV in database. VACANCY part is fixed for one query, but changes between queries and there's very little we can process in advance. We expect to have about 100 000 000 CVs in next year, and do not expect our current implementation to offer desired low latency responce (1 s) on 100M CVs. So we look for a horizontaly scaleable and fault-tolerant in-memory solution. Will Spark be usefull for our task? All tutorials I could find describe stream processing, or ML applications. What Spark extensions/backends can be useful? With best regards, Segey Melekhin
回复: How to use zookeeper in Spark Streaming
Can someone please help me on this? bit1...@163.com 发件人: bit1...@163.com 发送时间: 2015-05-24 13:53 收件人: user 主题: How to use zookeeper in Spark Streaming Hi, In my spark streaming application, when the application starts and get running, the Tasks running on the Worker nodes need to be notified that some configurations have been changed from time to time, these configurations reside on the Zookeeper. My question is, where should I put the code that works with Zookeeper for the configuration change, in the Driver code or in the Task code? Is there some guide on this? Thanks. bit1...@163.com
RE: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore
Thanks for reporting this. We intend to support the multiple metastore versions in a single build(hive-0.13.1) by introducing the IsolatedClientLoader, but probably you’re hitting the bug, please file a jira issue for this. I will keep investigating on this also. Hao From: Mark Hamstra [mailto:m...@clearstorydata.com] Sent: Sunday, May 24, 2015 9:06 PM To: Cheolsoo Park Cc: user@spark.apache.org; d...@spark.apache.org Subject: Re: SparkSQL errors in 1.4 rc when using with Hive 0.12 metastore This discussion belongs on the dev list. Please post any replies there. On Sat, May 23, 2015 at 10:19 PM, Cheolsoo Park piaozhe...@gmail.commailto:piaozhe...@gmail.com wrote: Hi, I've been testing SparkSQL in 1.4 rc and found two issues. I wanted to confirm whether these are bugs or not before opening a jira. 1) I can no longer compile SparkSQL with -Phive-0.12.0. I noticed that in 1.4, IsolatedClientLoader is introduced, and different versions of Hive metastore jars can be loaded at runtime. But instead, SparkSQL no longer compiles with Hive 0.12.0. My question is, is this intended? If so, shouldn't the hive-0.12.0 profile in POM be removed? 2) After compiling SparkSQL with -Phive-0.13.1, I ran into my 2nd problem. Since I have Hive 0.12 metastore in production, I have to use it for now. But even if I set spark.sql.hive.metastore.version and spark.sql.hive.metastore.jars, SparkSQL cli throws an error as follows- 15/05/24 05:03:29 WARN RetryingMetaStoreClient: MetaStoreClient lost connection. Attempting to reconnect. org.apache.thrift.TApplicationException: Invalid method name: 'get_functions' at org.apache.thrift.TApplicationException.read(TApplicationException.java:108) at org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:71) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.recv_get_functions(ThriftHiveMetastore.java:2886) at org.apache.hadoop.hive.metastore.api.ThriftHiveMetastore$Client.get_functions(ThriftHiveMetastore.java:2872) at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.getFunctions(HiveMetaStoreClient.java:1727) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89) at com.sun.proxy.$Proxy12.getFunctions(Unknown Source) at org.apache.hadoop.hive.ql.metadata.Hive.getFunctions(Hive.java:2670) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:674) at org.apache.hadoop.hive.ql.exec.FunctionRegistry.getFunctionNames(FunctionRegistry.java:662) at org.apache.hadoop.hive.cli.CliDriver.getCommandCompletor(CliDriver.java:540) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:175) at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala) What's happening is that when SparkSQL Cli starts up, it tries to fetch permanent udfs from Hive metastore (due to HIVE-6330https://issues.apache.org/jira/browse/HIVE-6330, which was introduced in Hive 0.13). But then, it ends up invoking an incompatible thrift function that doesn't exist in Hive 0.12. To work around this error, I have to comment out the following line of code for now- https://goo.gl/wcfnH1 My question is, is SparkSQL that is compiled against Hive 0.13 supposed to work with Hive 0.12 metastore (by setting spark.sql.hive.metastore.version and spark.sql.hive.metastore.jars)? It only works if I comment out the above line of code. Thanks, Cheolsoo
Re: Spark Streaming - Design considerations/Knobs
Blocks are replicated immediately, before the driver launches any jobs using them. On Thu, May 21, 2015 at 2:05 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Honestly, given the length of my email, I didn't expect a reply. :-) Thanks for reading and replying. However, I have a follow-up question: I don't think if I understand the block replication completely. Are the blocks replicated immediately after they are received by the receiver? Or are they kept on the receiver node only and are moved only on shuffle? Has the replication something to do with locality.wait? Thanks, Hemant On Thu, May 21, 2015 at 2:21 AM, Tathagata Das t...@databricks.com wrote: Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch