Re: External JARs not loading Spark Shell Scala 2.11
FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here:http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached.spark@dp-cluster-master-node-001:~/spark/bin$ spark-shell Spark Command: java -cp /opt/spark/conf:/opt/spark/lib/spark-assembly-1.3.2-SNAPSHOT-hadoop2.5.0-cdh5.3.3.jar:/etc/hadoop/conf:/opt/spark/lib/jline-2.12.jar -Dscala.usejavacp=true -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell Welcome to __ / __/__ ___ _/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.3.1 /_/ Using Scala version 2.11.2 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_40) Type in expressions to have them evaluated. Type :help for more information. Spark context available as sc. Exception in thread main java.lang.AssertionError: assertion failed: parser: (source: String, options: Map[String,String])org.apache.spark.sql.DataFrame, tailcalls: (source: String, options: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame, tailcalls: (source: String, options: scala.collection.immutable.Map)org.apache.spark.sql.DataFrame at scala.reflect.internal.Symbols$TypeHistory.init(Symbols.scala:3601) at scala.reflect.internal.Symbols$Symbol.rawInfo(Symbols.scala:1521) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1439) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23$$anonfun$apply$20.apply(SpecializeTypes.scala:775) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23$$anonfun$apply$20.apply(SpecializeTypes.scala:768) at scala.collection.immutable.List.flatMap(List.scala:327) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23.apply(SpecializeTypes.scala:768) at scala.tools.nsc.transform.SpecializeTypes$$anonfun$23.apply(SpecializeTypes.scala:766) at scala.collection.immutable.List.flatMap(List.scala:327) at scala.tools.nsc.transform.SpecializeTypes.specializeClass(SpecializeTypes.scala:766) at scala.tools.nsc.transform.SpecializeTypes.transformInfo(SpecializeTypes.scala:1187) at scala.tools.nsc.transform.InfoTransform$Phase$$anon$1.transform(InfoTransform.scala:38) at scala.reflect.internal.Symbols$Symbol.rawInfo(Symbols.scala:1519) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1439) at scala.reflect.internal.Symbols$Symbol.isDerivedValueClass(Symbols.scala:775) at scala.reflect.internal.transform.Erasure$ErasureMap.apply(Erasure.scala:131) at scala.reflect.internal.transform.Erasure$ErasureMap.apply(Erasure.scala:144) at scala.reflect.internal.transform.Erasure$class.specialErasure(Erasure.scala:209) at scala.tools.nsc.transform.Erasure.specialErasure(Erasure.scala:15) at scala.reflect.internal.transform.Erasure$class.transformInfo(Erasure.scala:364) at scala.tools.nsc.transform.Erasure.transformInfo(Erasure.scala:348) at scala.tools.nsc.transform.InfoTransform$Phase$$anon$1.transform(InfoTransform.scala:38) at scala.reflect.internal.Symbols$Symbol.rawInfo(Symbols.scala:1519) at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1439) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1$$anonfun$apply$mcV$sp$2.apply(Erasure.scala:753) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1$$anonfun$apply$mcV$sp$2.apply(Erasure.scala:753) at scala.reflect.internal.Scopes$Scope.foreach(Scopes.scala:373) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1.apply(Erasure.scala:753) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anonfun$checkNoDeclaredDoubleDefs$1.apply(Erasure.scala:753) at scala.reflect.internal.SymbolTable.enteringPhase(SymbolTable.scala:235) at scala.reflect.internal.SymbolTable.exitingPhase(SymbolTable.scala:256) at scala.tools.nsc.transform.Erasure$ErasureTransformer.checkNoDeclaredDoubleDefs(Erasure.scala:753) at scala.tools.nsc.transform.Erasure$ErasureTransformer.scala$tools$nsc$transform$Erasure$ErasureTransformer$$checkNoDoubleDefs(Erasure.scala:780) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anon$1.preErase(Erasure.scala:1074) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anon$1.transform(Erasure.scala:1109) at scala.tools.nsc.transform.Erasure$ErasureTransformer$$anon$1.transform(Erasure.scala:841) at scala.reflect.api.Trees$Transformer.transformTemplate(Trees.scala:2563) at scala.reflect.internal.Trees$$anonfun$itransform$4.apply(Trees.scala:1401) at scala.reflect.internal.Trees$$anonfun$itransform$4.apply(Trees.scala:1400) at
Re: Spark 1.3 saveAsTextFile with codec gives error - works with Spark 1.2
Not sure if this will help, but try clearing your jar cache (for sbt ~/.ivy2 and for maven ~/.m2) directories. Thanks Best Regards On Wed, Apr 15, 2015 at 9:33 PM, Manoj Samel manojsamelt...@gmail.com wrote: Env - Spark 1.3 Hadoop 2.3, Kerbeos xx.saveAsTextFile(path, codec) gives following trace. Same works with Spark 1.2 in same environment val codec = classOf[some codec class] val a = sc.textFile(/some_hdfs_file) a.saveAsTextFile(/some_other_hdfs_file, codec) fails with following trace in Spark 1.3, works in Spark 1.2 in same env 15/04/14 18:06:15 INFO scheduler.TaskSetManager: Lost task 1.3 in stage 2.0 (TID 17) on executor XYZ: java.lang.SecurityException (JCE cannot authenticate the provider BC) [duplicate 7] 15/04/14 18:06:15 INFO cluster.YarnScheduler: Removed TaskSet 2.0, whose tasks have all completed, from pool org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2.0 (TID 16, nodeXYZ): java.lang.SecurityException: JCE cannot authenticate the provider BC at javax.crypto.Cipher.getInstance(Cipher.java:642) at javax.crypto.Cipher.getInstance(Cipher.java:580) some codec calls at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:136) at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:91) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1068) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) Caused by: java.util.jar.JarException: file:/abc/filecache/11/spark-assembly-1.3.0-hadoop2.3.0.jar has unsigned entries - org/apache/spark/SparkHadoopWriter$.class at javax.crypto.JarVerifier.verifySingleJar(JarVerifier.java:462) at javax.crypto.JarVerifier.verifyJars(JarVerifier.java:322) at javax.crypto.JarVerifier.verify(JarVerifier.java:250) at javax.crypto.JceSecurity.verifyProviderJar(JceSecurity.java:161) at javax.crypto.JceSecurity.getVerificationResult(JceSecurity.java:187) at javax.crypto.Cipher.getInstance(Cipher.java:638) ... 16 more Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/ $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
Re: Distinct is very slow
How many tasks are you seeing in your mapToPair stage? Is it 7000? then i suggest you giving a number similar/close to 7000 in your .distinct call, what is happening in your case is that, you are repartitioning your data to a smaller number (32) which would put a lot of load on processing i believe, you can try increasing it. Thanks Best Regards On Fri, Apr 17, 2015 at 1:48 AM, Jeetendra Gangele gangele...@gmail.com wrote: Akhil, any thought on this? On 16 April 2015 at 23:07, Jeetendra Gangele gangele...@gmail.com wrote: No I did not tried the partitioning below is the full code public static void matchAndMerge(JavaRDDVendorRecord matchRdd,JavaSparkContext jsc) throws IOException{ long start = System.currentTimeMillis(); JavaPairRDDLong, MatcherReleventData RddForMarch =matchRdd.zipWithIndex().mapToPair(new PairFunctionTuple2VendorRecord,Long, Long, MatcherReleventData() { @Override public Tuple2Long, MatcherReleventData call(Tuple2VendorRecord, Long t) throws Exception { MatcherReleventData matcherData = new MatcherReleventData(); Tuple2Long, MatcherReleventData tuple = new Tuple2Long, MatcherReleventData(t._2, matcherData.convertVendorDataToMatcherData(t._1)); return tuple; } }).cache(); log.info(after index+RddForMarch.take(1)); MapLong, MatcherReleventData tmp =RddForMarch.collectAsMap(); MapLong, MatcherReleventData matchData = new HashMapLong, MatcherReleventData(tmp); final BroadcastMapLong, MatcherReleventData dataMatchGlobal = jsc.broadcast(matchData); JavaPairRDDLong,String blockingRdd = RddForMarch.flatMapValues(new FunctionMatcherReleventData, IterableString(){ @Override public IterableString call(MatcherReleventData v1) throws Exception { ListString values = new ArrayListString(); HelperUtilities helper1 = new HelperUtilities(); MatcherKeys matchkeys=helper1.getBlockinkeys(v1); if(matchkeys.get_companyName() !=null){ values.add(matchkeys.get_companyName()); } if(matchkeys.get_phoneNumberr() !=null){ values.add(matchkeys.get_phoneNumberr()); } if(matchkeys.get_zipCode() !=null){ values.add(matchkeys.get_zipCode()); } if(matchkeys.getM_domain() !=null){ values.add(matchkeys.getM_domain()); } return values; } }); log.info(blocking RDD is+blockingRdd.count()); int count=0; log.info(Starting printing); for (Tuple2Long, String entry : blockingRdd.collect()) { log.info(entry._1() + : + entry._2()); count++; } log.info(total count+count); JavaPairRDDLong,Integer completeDataToprocess=blockingRdd.flatMapValues( new FunctionString, IterableInteger(){ @Override public IterableInteger call(String v1) throws Exception { return ckdao.getSingelkeyresult(v1); } }).distinct(32); log.info(after hbase count is+completeDataToprocess.count()); log.info(data for process+completeDataToprocess.take(1)); JavaPairRDDLong, Tuple2Integer, Double withScore =completeDataToprocess.mapToPair( new PairFunctionTuple2Long,Integer, Long, Tuple2Integer, Double(){ @Override public Tuple2Long, Tuple2Integer, Double call(Tuple2Long, Integer t) throws Exception { Scoring scoreObj = new Scoring(); double score =scoreObj.computeMatchScore(companyDAO.get(t._2()), dataMatchGlobal.getValue().get(t._1())); Tuple2Integer, Double maptuple = new Tuple2Integer, Double(t._2(), score); Tuple2Long, Tuple2Integer, Double tuple = new Tuple2Long, Tuple2Integer,Double(t._1(), maptuple); return tuple; } }); log.info(with score tuple is+withScore.take(1)); JavaPairRDDLong, Tuple2Integer,Double maxScoreRDD =withScore.reduceByKey( new Function2Tuple2Integer,Double, Tuple2Integer,Double, Tuple2Integer,Double(){ @Override public Tuple2Integer, Double call(Tuple2Integer, Double v1, Tuple2Integer, Double v2) throws Exception { int res =v1._2().compareTo(v2._2()); if(res 0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v1._1(), v1._2()); return result; } else if(res0){ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } else{ Tuple2Integer, Double result = new Tuple2Integer, Double(v2._1(), v2._2()); return result; } } }); log.info(max score RDD+maxScoreRDD.take(10)); maxScoreRDD.foreach( new VoidFunctionTuple2Long,Tuple2Integer,Double(){ @Override public void call(Tuple2Long, Tuple2Integer, Double t) throws Exception { MatcherReleventData matchedData=dataMatchGlobal.getValue().get(t._1()); log.info(broadcast is+dataMatchGlobal.getValue().get(t._1())); //Set the score for better understanding of merge matchedData.setScore(t._2()._2()); vdDoa.updateMatchedRecordWithScore(matchedData, t._2()._1(),Souce_id); } }); log.info(took + (System.currentTimeMillis() - start) + mills to run matcher); } On 16 April 2015 at 22:25, Akhil Das ak...@sigmoidanalytics.com wrote: Can you paste your complete code? Did you try repartioning/increasing level of parallelism to speed up the processing. Since you have 16 cores, and I'm
Re: SparkR: Server IPC version 9 cannot communicate with client version 4
There's a version incompatibility between your hadoop jars. You need to make sure you build your spark with Hadoop 2.5.0-cdh5.3.1 version. Thanks Best Regards On Fri, Apr 17, 2015 at 5:17 AM, lalasriza . lala.s.r...@gmail.com wrote: Dear everyone, right now I am working with SparkR on cluster. The following are the package versions installed on the cluster: 1) Hadoop and Yarn: Hadoop 2.5.0-cdh5.3.1 Subversion http://github.com/cloudera/hadoop -r 4cda8416c73034b59cc8baafbe3666b074472846 Compiled by jenkins on 2015-01-28T00:46Z Compiled with protoc 2.5.0 From source with checksum 6a018149a764de4b8992755df9a2a1b 2) Spark: Spark version 1.2.0 For the SparkR installation, I was following the guide at https://github.com/amplab-extras/SparkR-pkg, by cloning the SparkR-pkg. Then, in SparkR-pkg, I typed: SPARK_VERSION=1.2.0 ./install-dev.sh SPARK_HADOOP_VERSION=2.5.0-cdh5.3.1 ./install-dev.sh After the installation, I tested SparkR as follows: MASTER=spark://xxx:7077 ./sparkR R rdd - parallelize(sc, 1:10) R partitionSum - lapplyPartition(rdd, function(part) { Reduce(+, part) }) R collect(partitionSum) # 15, 40 I got the result perfectly. However, when I try to get a file from HDFS or local file, I always failed. For example, R lines - textFile(sc, hdfs://xxx:8020/user/lala/simulation/README.md) R count(lines) The following are the errors I got: -- collect on 2 failed with java.lang.reflect.InvocationTargetException java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.handleMethodCall(SparkRBackendHandler.scala:111) at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:58) at edu.berkeley.cs.amplab.sparkr.SparkRBackendHandler.channelRead0(SparkRBackendHandler.scala:19) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) at java.lang.Thread.run(Thread.java:744) Caused by: org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4 at org.apache.hadoop.ipc.Client.call(Client.java:1070) at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:225) at com.sun.proxy.$Proxy10.getProtocolVersion(Unknown Source) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:396) at org.apache.hadoop.ipc.RPC.getProxy(RPC.java:379) at org.apache.hadoop.hdfs.DFSClient.createRPCNamenode(DFSClient.java:119) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:238) at org.apache.hadoop.hdfs.DFSClient.init(DFSClient.java:203) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:89) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1386) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:66) at
Re: Task result in Spark Worker Node
Hey Imran, Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. When I do, val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala) val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); I get the proper result and can print it out. But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error: Exception in thread main java.lang.NullPointerException at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$.net(SimpleApp.scala:71) at SimpleApp$.main(SimpleApp.scala:76) at SimpleApp.main(SimpleApp.scala) I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. def reduceMod(f: (Integer, Integer) = Integer): Integer = { val reducePartition: Iterator[Integer] = Option[Integer] = iter = { if (iter.hasNext) { Some(iter.reduceLeft(f)) } else { None } } val processFunc = (context: TaskContext, iter: Iterator[Integer]) = reducePartition(iter) val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) = Int] context = new TaskContextImpl(stageId = 1, partitionId = 1, taskAttemptId = 1, attemptNumber = 1, runningLocally = false) println(func.getClass.getName); println(func(context, rdd.iterator(rdd.partitions(1), context))); val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); 1 } I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. Thanks, Raghav On Apr 13, 2015, at 10:22 PM, Imran Rashid iras...@cloudera.com wrote: On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210 then its serialized along with some other goodies, and finally sent back to the driver here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L255 What happens on the driver is quite a bit more complicated, and involves a number of spots in the code, but at least to get you started, the results are received here:
Re: Spark on Windows
spark 'master' branch (i.e. v1.4.0) builds successfully on windows 8.1 intel i7 64-bit with oracle jdk8_45.with maven opts without the flag -XX:ReservedCodeCacheSize=1g. takes about 33 minutes. Thanking you. With Regards Sree On Thursday, April 16, 2015 9:07 PM, Arun Lists lists.a...@gmail.com wrote: Here is what I got from the engineer who worked on building Spark and using it on Windows: 1) Hadoop winutils.exe is needed on Windows, even for local files – and you have to set the Hadoop.home.dir in the spark-class2.cmd (for the two lines with $RUNNER near the end, by adding “-Dhadoop.home.dir=dir” file after downloading Hadoop binaries + winutils. 2) Java/Spark cannot delete the spark temporary files and it throws an exception (program still works though). Manual clean-up works just fine, and it is not a permissions issue as it has rights to create the file (I have also tried using my own directory rather than the default, same error).3) tried building Spark again, and have attached the log – I don’t get any errors, just warnings. However when I try to use that JAR I just get the error message “Error: Could not find or load main class org.apache.spark.deploy.SparkSubmit”. On Thu, Apr 16, 2015 at 12:19 PM, Arun Lists lists.a...@gmail.com wrote: Thanks, Matei! We'll try that and let you know if it works. You are correct in inferring that some of the problems we had were with dependencies. We also had problems with the spark-submit scripts. I will get the details from the engineer who worked on the Windows builds and provide them to you. arun On Thu, Apr 16, 2015 at 10:44 AM, Matei Zaharia matei.zaha...@gmail.com wrote: You could build Spark with Scala 2.11 on Mac / Linux and transfer it over to Windows. AFAIK it should build on Windows too, the only problem is that Maven might take a long time to download dependencies. What errors are you seeing? Matei On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote: We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: aliasing aggregate columns?
FYI.. the problem is that column names spark generates are not able to be referenced within SQL or dataframe operations (ie. SUM(cool_cnt#725)).. any idea how to alias these final aggregate columns.. the syntax below doesn't make sense, but this is what i'd ideally want to do: .agg({cool_cnt:sum.alias(cool_cnt),*:count.alias(cnt)}) On Wed, Apr 15, 2015 at 7:23 PM, elliott cordo elliottco...@gmail.com wrote: Hi Guys - Having trouble figuring out the semantics for using the alias function on the final sum and count aggregations? cool_summary = reviews.select(reviews.user_id, cool_cnt(votes.cool).alias(cool_cnt)).groupBy(user_id).agg({cool_cnt:sum,*:count}) cool_summary DataFrame[user_id: string, SUM(cool_cnt#725): double, COUNT(1): bigint]
RE: Spark Directed Acyclic Graph / Jobs
I think this paper will be a good resource (https://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf), also the paper of Dryad is also a good one. Thanks Jerry From: James King [mailto:jakwebin...@gmail.com] Sent: Friday, April 17, 2015 3:26 PM To: user Subject: Spark Directed Acyclic Graph / Jobs Is there a good resource that explains how Spark jobs gets broken down to tasks and executions. I just need to get a better understanding of this. Regards j
RE: ClassCastException processing date fields using spark SQL since 1.3.0
Hello again, steps to reproduce the same problem in JdbcRDD: - create a table containig Date field in your favourite DBMS, I used PostgreSQL: CREATE TABLE spark_test ( pk_spark_test integer NOT NULL, text character varying(25), date1 date, CONSTRAINT pk PRIMARY KEY (pk_spark_test) ) WITH ( OIDS=FALSE ); ALTER TABLE spark_test OWNER TO postgres; GRANT ALL ON TABLE spark_test TO postgres; GRANT ALL ON TABLE spark_test TO public; - fill it with data: insert into spark_test(pk_spark_test, text, date1) values (1, 'one', '2014-04-01') insert into spark_test(pk_spark_test, text, date1) values (2, 'two', '2014-04-02') - from scala REPL, try the following: import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) sqc.jdbc(jdbc:postgresql://localhost:5432/ebx_repository?schema=ebx_repositoryuser=abcpassword=def, spark_test).cache.registerTempTable(spark_test) // don’t forget the cache method sqc.sql(select * from spark_test).foreach(println) the last command will produce the following error (if you don’t use cache, it will produce correct results as expected): 11:50:27.306 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getInt(SpecificMutableRow.scala:248) ~[spark-catalyst_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.IntColumnStats.gatherStats(ColumnStats.scala:191) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:135) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:111) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) ~[spark-core_2.11-1.3.0.jar:1.3.0] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_11] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_11] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_11] 11:50:27.318 [task-result-getter-0] WARN o.a.spark.scheduler.TaskSetManager - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getInt(SpecificMutableRow.scala:248) at
Re: Actor not found
I just checked the codes about creating OutputCommitCoordinator. Could you reproduce this issue? If so, could you provide details about how to reproduce it? Best Regards, Shixiong(Ryan) Zhu 2015-04-16 13:27 GMT+08:00 Canoe canoe...@gmail.com: 13119 Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkdri...@dmslave13.et2.tbsi te.net:5908/), Path(/user/OutputCommitCoordinator)] 13120 at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) 13121 at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) 13122 at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 13123 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) 13124 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) 13125 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) 13126 at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) 13127 at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 13128 at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) 13129 at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) 13130 at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) 13131 at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) 13132 at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 13133 at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 13134 at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) 13135 at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) 13136 at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) 13137 at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) 13138 at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) 13139 at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) 13140 at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 13141 at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) 13142 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 13143 at akka.actor.ActorCell.invoke(ActorCell.scala:487) 13144 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 13145 at akka.dispatch.Mailbox.run(Mailbox.scala:220) 13146 at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) 13147 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 13148 at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 13149 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 13150 at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) I met the same problem when I run spark on yarn. Is this a bug or what ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Actor-not-found-tp22265p22508.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to do dispatching in Streaming?
Good use of analogies J Yep friction (or entropy in general) exists in everything – but hey by adding and doing “more work” at the same time (aka more powerful rockets) some people have overcome the friction of the air and even got as far as the moon and beyond It is all about the bottom lime / the big picture – in some models, friction can be a huge factor in the equations in some other it is just part of the landscape From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Friday, April 17, 2015 10:12 AM To: Evo Eftimov Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? Evo, In Spark there's a fixed scheduling cost for each task, so more tasks mean an increased bottom line for the same amount of work being done. The number of tasks per batch interval should relate to the CPU resources available for the job following the same 'rule of thumbs' than for Spark, being 2-3 times the #of cores. In that physical model presented before, I think we could consider this scheduling cost as a form of friction. -kr, Gerard. On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov evo.efti...@isecc.com wrote: Ooops – what does “more work” mean in a Parallel Programming paradigm and does it always translate in “inefficiency” Here are a few laws of physics in this space: 1. More Work if done AT THE SAME time AND fully utilizes the cluster resources is a GOOD thing 2. More Work which can not be done at the same time and has to be processed sequentially is a BAD thing So the key is whether it is about 1 or 2 and if it is about 1, whether it leads to e.g. Higher Throughput and Lower Latency or not Regards, Evo Eftimov From: Gerard Maas [mailto:gerard.m...@gmail.com] Sent: Thursday, April 16, 2015 10:41 AM To: Evo Eftimov Cc: Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel From: Tathagata Das [mailto:t...@databricks.com] Sent: Thursday, April 16, 2015 12:52 AM To: Jianshi Huang Cc: user; Shao, Saisai; Huang Jie Subject: Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Some questions on Multiple Streams
Hi, I am working with multiple Kafka streams (23 streams) and currently I am processing them separately. I receive one stream from each topic. I have the following questions. 1. Spark streaming guide suggests to union these streams. Is it possible to get statistics of each stream even after they are unioned? 2. My calculations are not complex. I use 2 second batch interval and if I use 2 streams they get easily processed under 2 seconds by a single core. There is some shuffling involved in my application. As I increase the number of streams and the number of executors accordingly, the applications scheduling delay increases and become unmanageable in 2 seconds. As I believe this happens because with that many streams, the number of tasks increases thus the shuffling magnifies and also that all streams using the same executors. Is it possible to provide part of executors to particular stream while processing streams simultaneously? E.g. if I have 15 cores on cluster and 5 streams, 5 cores will be taken by 5 receivers and of the rest 10, can I provide 2 cores each to one of the 5 streams. Just to add, increasing the batch interval does help but I don't want to increase the batch size due to application restrictions and delayed results (The blockInterval and defaultParallelism does help to a limited extent). Please see attach file for CODE SNIPPET Regards,Laeeq //Setting system properties val conf = new SparkConf().setMaster(spark://10.1.4.90:7077).setAppName(StreamAnomalyDetector) .setSparkHome(System.getenv(SPARK_HOME)) .setJars(List(target/scalaad-1.0-SNAPSHOT-jar-with-dependencies.jar)) .set(spark.executor.memory, 6g) .set(spark.executor.logs.rolling.strategy, size) .set(spark.executor.logs.rolling.size.maxBytes, 1024) .set(spark.executor.logs.rolling.maxRetainedFiles, 3) .set(spark.speculation,true) .set(spark.locality.wait,1000) .set(spark.streaming.unpersist,true) .set(spark.streaming.blockInterval,100) .set(spark.default.parallelism,10) val zkQuorum = 10.1.4.144:2181,10.1.4.145:2181,10.1.4.146:2181,10.1.4.147:2181,10.1.4.148:2181 val group = test-group // Create the context val ssc = new StreamingContext(conf, Seconds(2)) //hdfs path to checkpoint old data ssc.checkpoint(hdfs://host-10-1-4-90.novalocal:9000/user/hduser/checkpointing) // Create the KafkaDStream for (a - 0 to (args.length - 1)) { val eegStreams = KafkaUtils.createStream(ssc, zkQuorum, group, Map(args(a) - 1),StorageLevel.MEMORY_ONLY).map(_._2) val keyAndValues = eegStreams.map(x= { val token = x.split(,) (token(0),token(1),token(2)) }).persist(StorageLevel.MEMORY_ONLY) val timeAndFile = keyAndValues.map(x= (math.round(x._1.toDouble),x._2)).window(Seconds(2), Seconds(2)) val firstTimeAndFile = timeAndFile.transform( rdd = rdd.context.makeRDD(rdd.sortByKey(true).take(1))).map(x=(1L,(x._1,x._2))) val counts = keyAndValues.map(x = math.round(x._3.toDouble)).countByValueAndWindow(Seconds(2),Seconds(2)) val topCounts = counts.map(_.swap).transform( rdd = rdd.context.makeRDD(rdd.top(60), 10)) val absoluteTopCounts = topCounts.map(x = (math.abs(x._2)*x._1 , x._1 )).reduce((a, b) = (a._1 + b._1, a._2 + b._2)) val windowedFWA = absoluteTopCounts.map(x = (x._1.toFloat/x._2)) //Frequency Weighted Amplitude for Normal Data //CMA Stands for Cumulative Moving Average of Frequency Weighted Amplitude val CMA = windowedFWA.map(r = (1,(r.toDouble,1,1))).updateStateByKey[(Double,Int,Int)](updateSum).map(_._2) val anomaly = CMA.map(x = (1L , x._3)) val joinedResult = anomaly.join(firstTimeAndFile) joinedResult.map(x = %s,%s,%s.format(x._2._2._2, x._2._2._1, x._2._1)).saveAsTextFiles(hdfs://host-10-1-4-90.novalocal:9000/user/hduser/output/ + (a+1)) joinedResult.print } ssc.start() ssc.awaitTermination() } } - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Task result in Spark Worker Node
My apologies, I had pasted the wrong exception trace in the previous email. Here is the actual exception that I am receiving. Exception in thread main java.lang.NullPointerException at org.apache.spark.rdd.ParallelCollectionRDD$.slice(ParallelCollectionRDD.scala:154) at org.apache.spark.rdd.ParallelCollectionRDD.getPartitions(ParallelCollectionRDD.scala:97) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:217) On Apr 17, 2015, at 2:30 AM, Raghav Shankar raghav0110...@gmail.com wrote: Hey Imran, Thanks for the great explanation! This cleared up a lot of things for me. I am actually trying to utilize some of the features within Spark for a system I am developing. I am currently working on developing a subsystem that can be integrated within Spark and other Big Data solutions. In order to integrate it within Spark, I am trying to utilize the rdds and functions provided to the reduce method on my system. My system is developed in Scala and Java. In Spark, I have seen that the function provided to the reduce method, along with the RDD, gets serialized and sent to the worker nodes. The worker nodes are able to deserialize them and then execute the task on them. I see this happening in ResultTask.scala. When I try to do something similar, I get exceptions. The system I am developing has Spark jars in its build path, so it is able to create a SparkContext etc. When I do, val bytes = closureSerializer.serialize((rdd, func) : AnyRef).array() (similar to DAGScheduler.scala) val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bytes), Thread.currentThread.getContextClassLoader) println(func2(context, rdd2.iterator(rdd2.partitions(1), context))); I get the proper result and can print it out. But when I involve the network by serializing the data, using the network to send it to a different program, then deserialize the data and use the function, I get the following error: Exception in thread main java.lang.NullPointerException at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:36) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:31) at SimpleApp$$anonfun$1.apply(SimpleApp.scala:30) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$$anonfun$2.apply(SimpleApp.scala:37) at SimpleApp$.net(SimpleApp.scala:71) at SimpleApp$.main(SimpleApp.scala:76) at SimpleApp.main(SimpleApp.scala) I have also made sure that I am adding the class file of the program that is sending the serialized data to the bin folder of the program that is receiving the data. I’m not sure what I am doing wrong. I’ve done the serialization and creation of the function similar to how Spark does it. I created another reduce function like this. When implemented this way, it prints out the result of func2 properly. But when I involve the network by sending the serialized data to another program, I get the above exception. def reduceMod(f: (Integer, Integer) = Integer): Integer = { val reducePartition: Iterator[Integer] = Option[Integer] = iter = { if (iter.hasNext) { Some(iter.reduceLeft(f)) } else { None } } val processFunc = (context: TaskContext, iter: Iterator[Integer]) = reducePartition(iter) val func = processFunc.asInstanceOf[(TaskContext, Iterator[Int]) = Int] context = new TaskContextImpl(stageId = 1, partitionId = 1, taskAttemptId = 1, attemptNumber = 1, runningLocally = false) println(func.getClass.getName); println(func(context, rdd.iterator(rdd.partitions(1), context))); val bb = closureSerializer.serialize((rdd, func) : AnyRef).array() val (rdd2, func2) = closureSerializer.deserialize[(RDD[Int], (TaskContext, Iterator[Int]) = Int)]( ByteBuffer.wrap(bb), Thread.currentThread.getContextClassLoader) println(func2(context, rdd3.iterator(rdd3.partitions(1), context))); 1 } I was wondering if you had any ideas on what I am doing wrong, or how I can properly send the serialized version of the RDD and function to my other program. My thought is that I might need to add more jars to the build path, but I have no clue if thats the issue and what jars I need to add. Thanks, Raghav On Apr 13, 2015, at 10:22 PM, Imran Rashid iras...@cloudera.com mailto:iras...@cloudera.com wrote: On the worker side, it all happens in Executor. The task result is computed here: https://github.com/apache/spark/blob/b45059d0d7809a986ba07a447deb71f11ec6afe4/core/src/main/scala/org/apache/spark/executor/Executor.scala#L210
Re: Random pairs / RDD order
Hi Sean, Thanks a lot for your reply. The problem is that I need to sample random *independent* pairs. If I draw two samples and build all n*(n-1) pairs then there is a lot of dependency. My current solution is also not satisfying because some pairs (the closest ones in a partition) have a much higher probability to be sampled. Not sure how to fix this. Aurelien Le 16/04/2015 20:44, Sean Owen a écrit : Use mapPartitions, and then take two random samples of the elements in the partition, and return an iterator over all pairs of them? Should be pretty simple assuming your sample size n is smallish since you're returning ~n^2 pairs. On Thu, Apr 16, 2015 at 7:00 PM, abellet aurelien.bel...@telecom-paristech.fr wrote: Hi everyone, I have a large RDD and I am trying to create a RDD of a random sample of pairs of elements from this RDD. The elements composing a pair should come from the same partition for efficiency. The idea I've come up with is to take two random samples and then use zipPartitions to pair each i-th element of the first sample with the i-th element of the second sample. Here is a sample code illustrating the idea: --- val rdd = sc.parallelize(1 to 6, 16) val sample1 = rdd.sample(true,0.01,42) val sample2 = rdd.sample(true,0.01,43) def myfunc(s1: Iterator[Int], s2: Iterator[Int]): Iterator[String] = { var res = List[String]() while (s1.hasNext s2.hasNext) { val x = s1.next + + s2.next res ::= x } res.iterator } val pairs = sample1.zipPartitions(sample2)(myfunc) - However I am not happy with this solution because each element is most likely to be paired with elements that are closeby in the partition. This is because sample returns an ordered Iterator. Any idea how to fix this? I did not find a way to efficiently shuffle the random sample so far. Thanks a lot! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Random-pairs-RDD-order-tp22529.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: External JARs not loading Spark Shell Scala 2.11
Doesn't this reduce to Scala isn't compatible with itself across maintenance releases? Meaning, if this were fixed then Scala 2.11.{x 6} would have similar failures. It's not not-ready; it's just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the unofficial support to at least make the latest Scala 2.11 the unbroken one. On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman mich...@videoamp.com wrote: FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here: http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached. Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready for production use. I was going to file a bug, but it seems clear that the current implementation is going to need to be forward-ported to Scala 2.11.6 anyway. We already have an issue for that: https://issues.apache.org/jira/browse/SPARK-6155 Michael On Apr 9, 2015, at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: You will have to go to this commit ID 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are at that commit, you need to review the changes done to the repl code and look for the relevant occurrences of the same code in scala 2.11 repl source and somehow make it all work. Thanks, 1. http://githowto.com/getting_old_versions Prashant Sharma On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos ana...@gmail.com wrote: Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a known issue? I have tried searching around for answers but the only thing I've found that may be related is this: https://issues.apache.org/jira/browse/SPARK-3257 Any/all help is much appreciated. Thanks Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-JARs-not-loading-Spark-Shell-Scala-2-11-tp22434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: General configurations on CDH5 to achieve maximum Spark Performance
And btw if you suspect this is a YARN issue you can always launch and use Spark in a Standalone Mode which uses its own embedded cluster resource manager - this is possible even when Spark has been deployed on CDH under YARN by the pre-canned install scripts of CDH To achieve that: 1. Launch spark in a standalone mode using its shell scripts - you may get some script errors initially because of some mess in the scripts created by the pre-canned CDH YARN install - which you can fix by editing the spark standalone scripts - the error messages will guide you 2. Submit a spark job to the standalone spark master rather than YARN and this is it 3. Measure and compare the performance under YARN, Spark Standalone on Cluster and Spark Standalone on a single machine Bear in mind that running Spark in Standalone mode while using YARN for all other apps would not be very appropriate in production because the two resource managers will be competing for cluster resources - but you can use this for performance tests From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 6:28 PM To: 'Manish Gupta 8'; 'user@spark.apache.org' Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Essentially to change the performance yield of software cluster infrastructure platform like spark you play with different permutations of: - Number of CPU cores used by Spark Executors on every cluster node - Amount of RAM allocated for each executor How disks and network IO is used also plays a role but that is influenced more by app algorithmic aspects rather than YARN / Spark cluster config (except rack awreness etc) When Spark runs under the management of YARN the above is controlled / allocated by YARN https://spark.apache.org/docs/latest/running-on-yarn.html From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:21 PM To: Evo Eftimov; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Thanks Evo. Yes, my concern is only regarding the infrastructure configurations. Basically, configuring Yarn (Node manager) + Spark is must and default setting never works. And what really happens, is we make changes as and when an issue is faced because of one of the numerous default configuration settings. And every time, we have to google a lot to decide on the right values J Again, my issue is very centric to running Spark on Yarn in CDH5 environment. If you know a link that talks about optimum configuration settings for running Spark on Yarn (CDH5), please share the same. Thanks, Manish From: Evo Eftimov [mailto:evo.efti...@isecc.com] Sent: Thursday, April 16, 2015 10:38 PM To: Manish Gupta 8; user@spark.apache.org Subject: RE: General configurations on CDH5 to achieve maximum Spark Performance Well there are a number of performance tuning guidelines in dedicated sections of the spark documentation - have you read and applied them Secondly any performance problem within a distributed cluster environment has two aspects: 1. Infrastructure 2. App Algorithms You seem to be focusing only on 1, but what you said about the performance differences between single laptop and cluster points to potential algorithmic inefficiency in your app when e.g. distributing and performing parallel processing and data. On a single laptop data moves instantly between workers because all worker instances run in the memory of a single machine .. Regards, Evo Eftimov From: Manish Gupta 8 [mailto:mgupt...@sapient.com] Sent: Thursday, April 16, 2015 6:03 PM To: user@spark.apache.org Subject: General configurations on CDH5 to achieve maximum Spark Performance Hi, Is there a document/link that describes the general configuration settings to achieve maximum Spark Performance while running on CDH5? In our environment, we did lot of changes (and still doing it) to get decent performance otherwise our 6 node dev cluster with default configurations, lags behind a single laptop running Spark. Having a standard checklist (taking a base node size of 4-CPU, 16GB RAM) would be really great. Any pointers in this regards will be really helpful. We are running Spark 1.2.0 on CDH 5.3.0. Thanks, Manish Gupta Specialist | Sapient Global Markets Green Boulevard (Tower C) 3rd 4th Floor Plot No. B-9A, Sector 62 Noida 201 301 Uttar Pradesh, India Tel: +91 (120) 479 5000 Fax: +91 (120) 479 5001 Email: mgupt...@sapient.com sapientglobalmarkets.com The information transmitted is intended only for the person or entity to which it is addressed and may contain confidential and/or privileged material. Any review, retransmission, dissemination or other use of, or taking of any action in reliance upon, this information by persons or entities other than the intended recipient is
RE: ClassCastException processing date fields using spark SQL since 1.3.0
Normally I use like the following in scala: case calss datetest (x: Int, y:java.sql.Date) val dt = sc.parallelize(1 to 3).map(p = datetest(p, new java.sql.Date(p*1000*60*60*24))) sqlContext.createDataFrame(dt).registerTempTable(“t1”) sql(“select * from t1”).collect.foreach(println) If you still meets exceptions, please let me know about your query. The implicit conversion should be driven when you call createDataFrame Thanks, Daoyuan From: Krist Rastislav [mailto:rkr...@vub.sk] Sent: Friday, April 17, 2015 3:52 PM To: Wang, Daoyuan; Michael Armbrust Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Hello, thank You for Your answer – I am creating the DataFrames manually using org.apache.spark.sql.SQLContext#createDataFrame. RDD is my custom implementation encapsulating invocation of a remote REST-based web service and schema is created programatically upon metadata (obtained from the same WS). So in other words, the creation of Rows in DataFrame is fully under my control and the implicit conversion thus cannot occur. Is there any best practice (ideally a utility method) of creating Row instance from a set of values of types represented by DataFrame schema? I will try to take a deeper look into Your source code to locate the definition of the implicit conversion, but maybe some hint from Your side could help deliver a better implementation. Thank You very much for Your help (and for the great work you are doing there). Regards R.Krist From: Wang, Daoyuan [mailto:daoyuan.w...@intel.com] Sent: Friday, April 17, 2015 5:08 AM To: Michael Armbrust; Krist Rastislav Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 The conversion between date and int should be automatically handled by Implicit conversion. So we are accepting date types externally, and represented as integer internally. From: Wang, Daoyuan Sent: Friday, April 17, 2015 11:00 AM To: 'Michael Armbrust'; rkrist Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Can you tell us how did you create the dataframe? From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Friday, April 17, 2015 2:52 AM To: rkrist Cc: user Subject: Re: ClassCastException processing date fields using spark SQL since 1.3.0 Filed: https://issues.apache.org/jira/browse/SPARK-6967 Shouldn't they be null? Statistics are only used to eliminate partitions that can't possibly hold matching values. So while you are right this might result in a false positive, that will not result in a wrong answer. Informacie, ktore su obsahom tejto spravy elektronickej posty a vsetky pripojene subory a prilohy su doverne a su/mozu byt obchodnym a/alebo bankovym tajomstvom alebo su/mozu byt pravne chranene podla inych pravnych predpisov. Pre blizsie informacie navstivte, prosim, www.vub.sk/legalcautionhttp://www.vub.sk/legalcaution. The information contained in this electronic mail message and any files and attachments transmitted are confidential and are/may be a trade and/or bank secret or are/may be legally privileged under other legal regulations. For further information, please, visit www.vub.sk/legalcautionhttp://www.vub.sk/legalcaution. VUB, a.s., Mlynske nivy 1, 829 90 Bratislava 25, Slovenska republika Pred vytlacenim e-mailu prosim zvazte dopad na zivotne prostredie. Before printing this e-mail, think about the impact on the environment.
Path issue in running spark
A very basic but strange problem: On running master i am getting following error. My java path is proper, however spark-class file is getting error because here the in the string bin/java is duplicated. Can any body explain why it is getting this . Error: /bin/spark-class: line 190: exec: /usr/lib/jvm/java-8-oracle/jre/bin/java/bin/java: cannot execute: Not a directory -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Path-issue-in-running-spark-tp22536.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Addition of new Metrics for killed executors.
Hi, We are planning to add new Metrics in Spark for the executors that got killed during the execution. Was just curious, why this info is not already present. Is there some reason for not adding it.? Any ideas around are welcome. Thanks and Regards, Archit Thakur.
Re: Joined RDD
map phase of join* On Fri, Apr 17, 2015 at 5:28 PM, Archit Thakur archit279tha...@gmail.com wrote: Ajay, This is true. When we call join again on two RDD's.Rather than computing the whole pipe again, It reads the map output of the map phase of an RDD(which it usually gets from shuffle manager). If you see the code: override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = { val sparkConf = SparkEnv.get.conf val externalSorting = sparkConf.getBoolean(spark.shuffle.spill, true ) for ((dep, depNum) - split.deps.zipWithIndex) dep match { case NarrowCoGroupSplitDep(rdd, _, itsSplit) = // Read them from the parent val it = rdd.iterator(itsSplit, context).asInstanceOf[Iterator[ Product2[K, Any]]] rddIterators += ((it, depNum)) case ShuffleCoGroupSplitDep(handle) = // Read map outputs of shuffle val it = SparkEnv.get.shuffleManager .getReader(handle, split.index, split.index + 1, context) .read() rddIterators += ((it, depNum)) } This is CoGroupedRDD.scala, spark-1.3 code. If you see the UI, it shows these map stages as skipped.(And, this answers your question as well, Hoai-Thu Vong[in different thread about skipped stages.]). Thanks and Regards, Archit Thakur. On Thu, Nov 13, 2014 at 3:10 PM, ajay garg ajay.g...@mobileum.com wrote: Yes that is my understanding of how it should work. But in my case when I call collect first time, it reads the data from files on the disk. Subsequent collect queries are not reading data files ( Verified from the logs.) On spark ui I see only shuffle read and no shuffle write. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Joined-RDD-tp18820p18829.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Custom partioner
Hi Archit Thanks for reply. How can I don the costom compilation so reduce it to 4 bytes.I want to make it to 4 bytes in any case can you please guide? I am applying flatMapvalue in each step after ZipWithIndex it should be in same Node right? Why its suffling? Also I am running with very less records currently still its shuffling ? regards jeetendra On 17 April 2015 at 15:58, Archit Thakur archit279tha...@gmail.com wrote: I dont think you can change it to 4 bytes without any custom compilation. To make same key go to same node, you'll have to repartition the data, which is shuffling anyway. Unless your raw data is such that the same key is on same node, you'll have to shuffle atleast once to make same key on same node. On Thu, Apr 16, 2015 at 10:16 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have a RDD which has 1 million keys and each key is repeated from around 7000 values so total there will be around 1M*7K records in RDD. and each key is created from ZipWithIndex so key start from 0 to M-1 the problem with ZipWithIndex is it take long for key which is 8 bytes. can I reduce it to 4 bytes? Now how Can I make sure the record with same key will go the same node so that I can avoid shuffling. Also how default partition-er will work here. Regards jeetendra
Re: Executor memory in web UI
This is the fraction available for caching, which is 60% * 90% * total by default. On Fri, Apr 17, 2015 at 11:30 AM, podioss grega...@hotmail.com wrote: Hi, i am a bit confused with the executor-memory option. I am running applications with Standalone cluster manager with 8 workers with 4gb memory and 2 cores each and when i submit my application with spark-submit i use --executor-memory 1g. In the web ui in the completed applications table i see that my application was correctly submitted with 1g memory per node as expected but when i check the executors tab of the application i see that every executor launched with 530mb which is about half the memory of the configuration. I would really appreciate an explanation if anyone knew. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-memory-in-web-UI-tp22538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark on Windows
Thanks, Sree! Are you able to run your applications using spark-submit? Even after we were able to build successfully, we ran into problems with running the spark-submit script. If everything worked correctly for you, we can hope that things will be smoother when 1.4.0 is made generally available. arun On Thu, Apr 16, 2015 at 10:18 PM, Sree V sree_at_ch...@yahoo.com wrote: spark 'master' branch (i.e. v1.4.0) builds successfully on windows 8.1 intel i7 64-bit with oracle jdk8_45. with maven opts without the flag -XX:ReservedCodeCacheSize=1g. takes about 33 minutes. Thanking you. With Regards Sree On Thursday, April 16, 2015 9:07 PM, Arun Lists lists.a...@gmail.com wrote: Here is what I got from the engineer who worked on building Spark and using it on Windows: 1) Hadoop winutils.exe is needed on Windows, even for local files – and you have to set the Hadoop.home.dir in the spark-class2.cmd (for the two lines with $RUNNER near the end, by adding “-Dhadoop.home.dir=dir” file after downloading Hadoop binaries + winutils. 2) Java/Spark cannot delete the spark temporary files and it throws an exception (program still works though). Manual clean-up works just fine, and it is not a permissions issue as it has rights to create the file (I have also tried using my own directory rather than the default, same error). 3) tried building Spark again, and have attached the log – I don’t get any errors, just warnings. However when I try to use that JAR I just get the error message “Error: Could not find or load main class org.apache.spark.deploy.SparkSubmit”. On Thu, Apr 16, 2015 at 12:19 PM, Arun Lists lists.a...@gmail.com wrote: Thanks, Matei! We'll try that and let you know if it works. You are correct in inferring that some of the problems we had were with dependencies. We also had problems with the spark-submit scripts. I will get the details from the engineer who worked on the Windows builds and provide them to you. arun On Thu, Apr 16, 2015 at 10:44 AM, Matei Zaharia matei.zaha...@gmail.com wrote: You could build Spark with Scala 2.11 on Mac / Linux and transfer it over to Windows. AFAIK it should build on Windows too, the only problem is that Maven might take a long time to download dependencies. What errors are you seeing? Matei On Apr 16, 2015, at 9:23 AM, Arun Lists lists.a...@gmail.com wrote: We run Spark on Mac and Linux but also need to run it on Windows 8.1 and Windows Server. We ran into problems with the Scala 2.10 binary bundle for Spark 1.3.0 but managed to get it working. However, on Mac/Linux, we are on Scala 2.11.6 (we built Spark from the sources). On Windows, however despite our best efforts we cannot get Spark 1.3.0 as built from sources working for Scala 2.11.6. Spark has too many moving parts and dependencies! When can we expect to see a binary bundle for Spark 1.3.0 that is built for Scala 2.11.6? I read somewhere that the only reason that Spark 1.3.0 is still built for Scala 2.10 is because Kafka is still on Scala 2.10. For those of us who don't use Kafka, can we have a Scala 2.10 bundle. If there isn't an official bundle arriving any time soon, can someone who has built it for Windows 8.1 successfully please share with the group? Thanks, arun - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Streaming problems running 24x7
Hi Akhil, Thank you for your response, I think it is not because of the processing time, in fact the delay is under 1 second, while the batch interval is 10 seconds… The data volume is low (10 lines / second) By the way, I have seen some results changing to this call of Kafkautils: KafkaUtils.createDirectStream CPU usage is low and stable, but memory is slowly increasing… But at least the process last longer.. Best regards, Miquel De: Akhil Das [mailto:ak...@sigmoidanalytics.com] Enviado el: jueves, 16 de abril de 2015 12:07 Para: González Salgado, Miquel CC: user@spark.apache.org Asunto: Re: Streaming problems running 24x7 I used to hit this issue when my processing time exceeds the batch duration. Here's a few workarounds: - Use storage level MEMORY_AND_DISK - Enable WAL and check pointing Above two will slow down things a little bit. If you want low latency, what you can try is: - Use storage level as MEMORY_ONLY_2 ( Atleast replicates it) - Tachyon based off heap for storage (havent tried this, but will let you know) And from spark 1.3.1 version, they have purged the old WAL and it has better performance. You could try that also. On 16 Apr 2015 14:10, Miquel miquel.gonza...@tecsidel.esmailto:miquel.gonza...@tecsidel.es wrote: Hello, I'm finding problems to run a spark streaming job for more than a few hours (3 or 4). It begins working OK, but it degrades until failure. Some of the symptoms: - Consumed memory and CPU keeps getting higher ang higher, and finally some error is being thrown (java.lang.Exception: Could not compute split, block input-0-1429168311800 not found) and data stops being calculated. - The delay showed in web UI keeps also increasing. - After some hours disk space is being consumed. There are a lot of directories with name like /tmp/spark-e3505437-f509-4b5b-92d2-ae2559badb3c The job is basically reading information from kafka topic, and calculate several topN tables for some key and value camps related with netflow data, some of the parameters are this: - batch interval: 10 seconds - window calculation: 1 minute - spark.cleaner.ttl: 5 minutes The execution is standalone on one machine (16GB RAM , 12 cores), and the options to run it is as follows: /opt/spark/bin/spark-submit --driver-java-options -XX:+UseCompressedOops --jars $JARS --class $APPCLASS --master local[2] $APPJAR someone has some clues about the problem? I don't know if it is a configuration problem or some error in the code that is causing memory leaks.. Thank you in advance! Miquel PD: the code is basically this:-- object NetflowTopn { var appPath = . var zkQuorum = var group = var topics = var numThreads = 1 var batch_interval = 10 var n_window = 1 var n_slide = 1 var topnsize = 10 var hm = Map[String,Int]() hm += ( unix_secs - 0 ) hm += ( unix_nsecs - 1 ) hm += ( sysuptime - 2 ) hm += ( exaddr - 3 ) hm += ( dpkts - 4 ) hm += ( doctets -5 ) hm += ( first - 6 ) hm += ( last - 7 ) hm += ( engine_type - 8 ) hm += ( engine_id - 9 ) hm += ( srcaddr -10 ) hm += ( dstaddr -11 ) hm += ( nexthop -12 ) hm += ( input - 13 ) hm += ( output - 14 ) hm += ( srcport -15 ) hm += ( dstport -16 ) hm += ( prot - 17 ) hm += ( tos -18 ) hm += ( tcp_flags - 19 ) hm += ( src_mask - 20 ) hm += ( dst_mask - 21 ) hm += ( src_as - 22 ) hm += ( dst_as - 23 ) def getKey (lcamps: Array[String], camp: String): String = { if (camp == total) return total else return lcamps(hm(camp)) } def getVal (lcamps: Array[String], camp: String): Long = { if (camp == flows) return 1L else return lcamps(hm(camp)).toLong } def getKeyVal (line: String, keycamps: List[String], valcamp: String ) = { val arr = line.split(,) (keycamps.map(getKey(arr, _)).mkString(,) , getVal(arr,valcamp) ) } def writeOutput (data: Array[(Long, String)], keycamps_str: String, csvheader: String, valcamp: String, prefix: String) = { val ts = System.currentTimeMillis val f1 = appPath + /data/ + prefix + _ + keycamps_str + _ + valcamp + .csv val f1f = new File(f1); val ftmpf = new File(f1 + ts); val pw = new PrintWriter(ftmpf) pw.println(csvheader) data.foreach{ t = pw.println (t._2 + , + t._1) } pw.close ftmpf.renameTo(f1f); } def main(args: Array[String]) { if (args.length 1) { System.err.println(Usage: NetflowTopn apppath) System.exit(1) } appPath = args(0) try { val prop = new Properties() prop.load(new FileInputStream(appPath + /conf/app.properties)) zkQuorum =prop.getProperty(KAFKA_HOST) group = prop.getProperty(KAFKA_GROUP) topics = prop.getProperty(KAFKA_TOPIC) numThreads =
Re: RDD collect hangs on large input data
Thanks for your answer Imran. I haven't tried your suggestions yet, but setting spark.shuffle.blockTransferService=nio solved my issue. There is a JIRA for this: https://issues.apache.org/jira/browse/SPARK-6962. Zsolt 2015-04-14 21:57 GMT+02:00 Imran Rashid iras...@cloudera.com: is it possible that when you switch to the bigger data set, your data is skewed, and so that some tasks generate far more data? reduceByKey could result in a huge amount of data going to a small number of tasks. I'd suggest (a) seeing what happens if you don't collect() -- eg. instead try writing to hdfs with saveAsObjectFile. (b) take a look at what is happening on the executors with the long running tasks. You can get thread dumps via the UI (or you can login into the boxes and use jstack). This might point to some of your code that is taking a long time, or it might point to spark internals. On Wed, Apr 8, 2015 at 3:45 AM, Zsolt Tóth toth.zsolt@gmail.com wrote: I use EMR 3.3.1 which comes with Java 7. Do you think that this may cause the issue? Did you test it with Java 8?
Re: Custom partioner
By custom installation, I meant change the code and build it. I have not done the complete impact analysis, just had a look on the code. When you say, same key goes to same node, It would need shuffling unless the raw data you are reading is present that way. On Apr 17, 2015 6:30 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi Archit Thanks for reply. How can I don the costom compilation so reduce it to 4 bytes.I want to make it to 4 bytes in any case can you please guide? I am applying flatMapvalue in each step after ZipWithIndex it should be in same Node right? Why its suffling? Also I am running with very less records currently still its shuffling ? regards jeetendra On 17 April 2015 at 15:58, Archit Thakur archit279tha...@gmail.com wrote: I dont think you can change it to 4 bytes without any custom compilation. To make same key go to same node, you'll have to repartition the data, which is shuffling anyway. Unless your raw data is such that the same key is on same node, you'll have to shuffle atleast once to make same key on same node. On Thu, Apr 16, 2015 at 10:16 PM, Jeetendra Gangele gangele...@gmail.com wrote: Hi All I have a RDD which has 1 million keys and each key is repeated from around 7000 values so total there will be around 1M*7K records in RDD. and each key is created from ZipWithIndex so key start from 0 to M-1 the problem with ZipWithIndex is it take long for key which is 8 bytes. can I reduce it to 4 bytes? Now how Can I make sure the record with same key will go the same node so that I can avoid shuffling. Also how default partition-er will work here. Regards jeetendra
Running into several problems with Data Frames
I decided to play around with DataFrames this morning but I'm running into quite a few issues. I'm assuming that I must be doing something wrong so would appreciate some advice. First, I create my Data Frame. import sqlContext.implicits._ case class Entity(InternalId: Long, EntityId: Long, EntityType: String, CustomerId: String, EntityURI: String, NumDocs: Long) val entities = sc.textFile(s3n://darin/Entities.csv) val entitiesArr = entities.map(v = v.split('|')) val dfEntity = entitiesArr.map(arr = Entity(arr(0).toLong, arr(1).toLong, arr(2), arr(3), arr(4), arr(5).toLong)).toDF() Second, I verify the schema. dfEntity.printSchema root |-- InternalId: long (nullable = false) |-- EntityId: long (nullable = false) |-- EntityType: string (nullable = true) |-- CustomerId: string (nullable = true) |-- EntityURI: string (nullable = true) |-- NumDocs: long (nullable = false) Third, I verify I can select a column. dfEntity.select(InternalId).limit(10).show() InternalId 1 2 3 4 5 6 7 8 9 10 But, things then start to break down. Let's assume I want to filter so I only have records where the InternalId is 5. dfEntity.filter(InternalId 5L).count() But, this gives me the following error message. Doesn't the schema above indicate the InternalId column should be of type Long? console:42: error: type mismatch; found : Long(5L) required: String dfEntity.filter(InternalId 5L).count() I then try the following dfEntity.filter(dfEntity(InternalId) 5L).count() Now, this gives me the following error instead. org.apache.spark.SparkException: Job aborted due to stage failure: Task 13 in stage 153.0 failed 4 times, most recent failure: Lost task 13.3 in stage 153.0 (TID 1636, ip-10-0-200-6.ec2.internal): java.lang.ArrayIndexOutOfBoundsException I'm using Apache Spark 1.3. Thanks. Darin. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
ClassCastException while caching a query
Hi all, Spark 1.2.1. I have a Cassandra column family and doing the following SchemaRDD s = cassandraSQLContext.sql(select user.id as user_id from user); // user.id is UUID in table definition s.registerTempTable( my_user ); s.cache(); // throws following exception // tried the cassandraSQLContext.cacheTable( my_user ); // also throws the same exception Is there a way to resolve it. Regards. Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.ClassCastException: java.util.UUID cannot be cast to java.lang.String at org.apache.spark.sql.catalyst.expressions.GenericRow.getString(Row.scala:183) at org.apache.spark.sql.columnar.StringColumnStats.gatherStats(ColumnStats.scala:208) at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) at org.apache.spark.sql.columnar.NativeColumnBuilder.org $apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:125) at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:112) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) at org.apache.spark.rdd.RDD.iterator(RDD.scala:245) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
SparkStreaming 1.3.0 fileNotFound Exception while using WAL Checkpoints
Hi With SparkStreaming on 1.3.0 version when I'm using WAL and checkpoints, sometimes, I'm hitting fileNotFound exceptions. Here's the complete stacktrace: https://gist.github.com/akhld/126b945f7fef408a525e The application simply reads data from Kafka and does a simple wordcount over it. Batch duration is 1 second and processing delay is somewhat around 3-6 seconds. (Standalone 2 node cluster with 15GB of mem and 4 cores each) Without WAL and checkpoints and using only MEMORY_ONLY as StorageLevel Instead of fileNotFound, the exception is blockNotFound which is reduced while using MEMORY_ONLY_2 as StorageLevel, and when using MEMORY_AND_DISK, the performance is really awful and it fills up disk in /tmp with spark-d2ad4262-0f6f-409d-b51f-a0a871cbf64f files. Any thoughts on this are welcome. Thanks Best Regards
Executor memory in web UI
Hi, i am a bit confused with the executor-memory option. I am running applications with Standalone cluster manager with 8 workers with 4gb memory and 2 cores each and when i submit my application with spark-submit i use --executor-memory 1g. In the web ui in the completed applications table i see that my application was correctly submitted with 1g memory per node as expected but when i check the executors tab of the application i see that every executor launched with 530mb which is about half the memory of the configuration. I would really appreciate an explanation if anyone knew. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executor-memory-in-web-UI-tp22538.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Metrics Servlet on spark 1.2
Hi, I am unable to access the metrics servlet on spark 1.2. I tried to access it from the app master UI on port 4040 but i dont see any metrics there. Is it a known issue with spark 1.2 or am I doing something wrong? Also how do I publish my own metrics and view them on this servlet? Thanks, Udit
When are TaskCompletionListeners called?
Hi, I'm trying to figure out when TaskCompletionListeners are called -- are they called at the end of the RDD's compute() method, or after the iteration through the iterator of the compute() method is completed. To put it another way, is this OK: class DatabaseRDD[T] extends RDD[T] { def compute(...): Iterator[T] = { val session = // acquire a DB session TaskContext.get.addTaskCompletionListener { (context) = session.release() } val iterator = session.query(...) iterator } }
Re: How to do dispatching in Streaming?
Thanks everyone for the reply. Looks like foreachRDD + filtering is the way to go. I'll have 4 independent Spark streaming applications so the overhead seems acceptable. Jianshi On Fri, Apr 17, 2015 at 5:17 PM, Evo Eftimov evo.efti...@isecc.com wrote: Good use of analogies J Yep friction (or entropy in general) exists in everything – but hey by adding and doing “more work” at the same time (aka more powerful rockets) some people have overcome the friction of the air and even got as far as the moon and beyond It is all about the bottom lime / the big picture – in some models, friction can be a huge factor in the equations in some other it is just part of the landscape *From:* Gerard Maas [mailto:gerard.m...@gmail.com] *Sent:* Friday, April 17, 2015 10:12 AM *To:* Evo Eftimov *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? Evo, In Spark there's a fixed scheduling cost for each task, so more tasks mean an increased bottom line for the same amount of work being done. The number of tasks per batch interval should relate to the CPU resources available for the job following the same 'rule of thumbs' than for Spark, being 2-3 times the #of cores. In that physical model presented before, I think we could consider this scheduling cost as a form of friction. -kr, Gerard. On Thu, Apr 16, 2015 at 11:47 AM, Evo Eftimov evo.efti...@isecc.com wrote: Ooops – what does “more work” mean in a Parallel Programming paradigm and does it always translate in “inefficiency” Here are a few laws of physics in this space: 1. More Work if done AT THE SAME time AND fully utilizes the cluster resources is a GOOD thing 2. More Work which can not be done at the same time and has to be processed sequentially is a BAD thing So the key is whether it is about 1 or 2 and if it is about 1, whether it leads to e.g. Higher Throughput and Lower Latency or not Regards, Evo Eftimov *From:* Gerard Maas [mailto:gerard.m...@gmail.com] *Sent:* Thursday, April 16, 2015 10:41 AM *To:* Evo Eftimov *Cc:* Tathagata Das; Jianshi Huang; user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? From experience, I'd recommend using the dstream.foreachRDD method and doing the filtering within that context. Extending the example of TD, something like this: dstream.foreachRDD { rdd = rdd.cache() messageType.foreach (msgTyp = val selection = rdd.filter(msgTyp.match(_)) selection.foreach { ... } } rdd.unpersist() } I would discourage the use of: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Because it will be a lot more work to process on the spark side. Each DSteam will schedule tasks for each partition, resulting in #dstream x #partitions x #stages tasks instead of the #partitions x #stages with the approach presented above. -kr, Gerard. On Thu, Apr 16, 2015 at 10:57 AM, Evo Eftimov evo.efti...@isecc.com wrote: And yet another way is to demultiplex at one point which will yield separate DStreams for each message type which you can then process in independent DAG pipelines in the following way: MessageType1DStream = MainDStream.filter(message type1) MessageType2DStream = MainDStream.filter(message type2) MessageType3DStream = MainDStream.filter(message type3) Then proceed your processing independently with MessageType1DStream, MessageType2DStream and MessageType3DStream ie each of them is a starting point of a new DAG pipeline running in parallel *From:* Tathagata Das [mailto:t...@databricks.com] *Sent:* Thursday, April 16, 2015 12:52 AM *To:* Jianshi Huang *Cc:* user; Shao, Saisai; Huang Jie *Subject:* Re: How to do dispatching in Streaming? It may be worthwhile to do architect the computation in a different way. dstream.foreachRDD { rdd = rdd.foreach { record = // do different things for each record based on filters } } TD On Sun, Apr 12, 2015 at 7:52 PM, Jianshi Huang jianshi.hu...@gmail.com wrote: Hi, I have a Kafka topic that contains dozens of different types of messages. And for each one I'll need to create a DStream for it. Currently I have to filter the Kafka stream over and over, which is very inefficient. So what's the best way to do dispatching in Spark Streaming? (one DStream - multiple DStreams) Thanks, -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/ -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Spark hanging after main method completes
I recently started using Spark version 1.3.0 in standalone mode (with Scala 2.10.3), and I'm running into an odd problem. I'm loading data from a file using sc.textFile, doing some conversion of the data, and then clustering it. When I do this with a small file (10 lines, 9 KB), it works fine, and the program terminates. However, when I load my full data file (400,000 lines, 167 MB), the process hangs sometime after the last line of my main() method. (The last line is a print statement, and it gets printed.) I am creating quite a few objects, so I don't know if perhaps the garbage collection is just taking a really long time? (5+ minutes; I haven't had the patience to let it go longer than that.) Is there anything I should try to fix this, or to help diagnose the issue? Thanks, Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hanging-after-main-method-completes-tp22544.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Distinct is very slow
I am saying to partition something like partitionBy(new HashPartitioner(16) will this not work? On 17 April 2015 at 21:28, Jeetendra Gangele gangele...@gmail.com wrote: I have given 3000 task to mapToPair now its taking so much memory and shuffling and wasting time there. Here is the stats when I run with very small data almost for all data its doing shuffling not sure what is happening here any idea? - *Total task time across all tasks: *11.0 h - *Shuffle read: *153.8 MB - *Shuffle write: *288.0 MB On 17 April 2015 at 14:32, Jeetendra Gangele gangele...@gmail.com wrote: mapToPair is running with 32 tasks but very slow because lot of shuffles read. attaching screen shot each task is running from 10 mins. even Though Inside function i m not doing anything costly.
Which version of Hive QL is Spark 1.3.0 using?
So I'm trying to store the results of a query into a DataFrame, but I get the following exception thrown: Exception in thread main java.lang.RuntimeException: [1.71] failure: ``*'' expected but `select' found SELECT DISTINCT OutSwitchID FROM wtbECRTemp WHERE OutSwtichID NOT IN (SELECT SwitchID FROM tmpCDRSwitchIDs) And it has a ^ pointing to the second SELECT. But according to this (https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries), subqueries should be supported with Hive 0.13.0. So which version is Spark using? And if subqueries are not currently supported, what would be a suitable alternative to this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-version-of-Hive-QL-is-Spark-1-3-0-using-tp22542.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Which version of Hive QL is Spark 1.3.0 using?
Support for sub queries in predicates hasn't been resolved yet - please refer to SPARK-4226 BTW, Spark 1.3 default bindings to Hive 0.13.1 On Fri, Apr 17, 2015 at 09:18 ARose ashley.r...@telarix.com wrote: So I'm trying to store the results of a query into a DataFrame, but I get the following exception thrown: Exception in thread main java.lang.RuntimeException: [1.71] failure: ``*'' expected but `select' found SELECT DISTINCT OutSwitchID FROM wtbECRTemp WHERE OutSwtichID NOT IN (SELECT SwitchID FROM tmpCDRSwitchIDs) And it has a ^ pointing to the second SELECT. But according to this ( https://cwiki.apache.org/confluence/display/Hive/LanguageManual+SubQueries ), subqueries should be supported with Hive 0.13.0. So which version is Spark using? And if subqueries are not currently supported, what would be a suitable alternative to this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-version-of-Hive-QL-is-Spark-1-3-0-using-tp22542.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: When are TaskCompletionListeners called?
its the latter -- after spark gets to the end of the iterator (or if it hits an exception) so your example is good, that is exactly what it is intended for. On Fri, Apr 17, 2015 at 12:23 PM, Akshat Aranya aara...@gmail.com wrote: Hi, I'm trying to figure out when TaskCompletionListeners are called -- are they called at the end of the RDD's compute() method, or after the iteration through the iterator of the compute() method is completed. To put it another way, is this OK: class DatabaseRDD[T] extends RDD[T] { def compute(...): Iterator[T] = { val session = // acquire a DB session TaskContext.get.addTaskCompletionListener { (context) = session.release() } val iterator = session.query(...) iterator } }
Re: Spark Code to read RCFiles
Hi, I'm new to Spark and am working on a proof of concept. I'm using Spark 1.3.0 and running in local mode. I can read and parse an RCFile using Spark however the performance is not as good as I hoped. I'm testing using ~800k rows and it is taking about 30 mins to process. Is there a better way to load and process a RCFile? The only reference to RCFile in 'Learning Spark' is in the SparkSQL chapter. Is using SparkSQL for RCFiles the recommendation and I should avoid using Spark core functionality for RCFiles? I'm using the following code to build RDD[Record] val records: RDD[Record] = sc.hadoopFile(rcFile, classOf[RCFileInputFormat[LongWritable, BytesRefArrayWritable]], classOf[LongWritable], classOf[BytesRefArrayWritable]) .map(x = ( x._1.get, parse( x._2 ) ) ).map(pair = pair._2) the function parse is defined as: def parse(braw: BytesRefArrayWritable ): Record = { val serDe = new ColumnarSerDe() var tbl: Properties = new Properties(); tbl.setProperty(serialization.format, 9) tbl.setProperty(columns, time,id,name,application) tbl.setProperty(columns.types, string:int:string:string) tbl.setProperty(serialization.null.format, NULL); serDe.initialize(new Configuration(), tbl); val oi = serDe.getObjectInspector() val soi: StructObjectInspector = oi.asInstanceOf[StructObjectInspector] val fieldRefs: Buffer[_ :StructField] = soi.getAllStructFieldRefs().asScala val row = serDe.deserialize(braw) val timeRec = soi.getStructFieldData(row, fieldRefs(0)) val idRec = soi.getStructFieldData(row, fieldRefs(1)) val nameRec = soi.getStructFieldData(row, fieldRefs(2)) val applicationRec = soi.getStructFieldData(row, fieldRefs(3)) var timeOI = fieldRefs(0).getFieldObjectInspector().asInstanceOf[StringObjectInspector]; var time = timeOI.getPrimitiveJavaObject(timeRec); var idOI = fieldRefs(1).getFieldObjectInspector().asInstanceOf[IntObjectInspector]; var id = idOI.get(idRec); var nameOI = fieldRefs(2).getFieldObjectInspector().asInstanceOf[StringObjectInspector]; var name = nameOI.getPrimitiveJavaObject(nameRec); var appOI = fieldRefs(3).getFieldObjectInspector().asInstanceOf[StringObjectInspector]; var app = appOI.getPrimitiveJavaObject(applicationRec); new Record(time, id, name, app) } Thanks in advance, Glenda -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Code-to-read-RCFiles-tp14934p22545.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to persist RDD return from partitionBy() to disk?
https://issues.apache.org/jira/browse/SPARK-1061 note the proposed fix isn't to have spark automatically know about the partitioner when it reloads the data, but at least to make it *possible* for it to be done at the application level. On Fri, Apr 17, 2015 at 11:35 AM, Wang, Ningjun (LNG-NPV) ningjun.w...@lexisnexis.com wrote: I have a huge RDD[Document] with millions of items. I partitioned it using HashPartitioner and save as object file. But when I load the object file back into RDD, I lost the HashPartitioner. How do I preserve the partitions when loading the object file? Here is the code *val *docVectors : RDD[DocVector] = computeRdd() // expensive calculation *val *partitionedDocVectors : RDD[(String, DocVector)] = docVectors .keyBy(d = d.id).partitionBy(*new *HashPartitioner(16)) partitionedDocVectors.saveAsObjectFile( *c:/temp/partitionedDocVectors.obj*) // At this point, I check the folder *c:/temp/partitionedDocVectors.obj, it contains 16 parts: “part-0, part-1, … part-00015”* // Now laod the object file back *val *partitionedDocVectors2 : RDD[(String, DocVector)] = sc.objectFile( *c:/temp/partitionedDocVectors.obj*) // Now partitionedDocVectors2 contains 956 parts and it has no partinier *println*(*spartitions: **$*{partitionedDocVectors.partitions.size}**) // return 956 *if *(idAndDocVectors.partitioner.isEmpty) *println*(*No partitioner*) // it does print out this line So how can I preserve the partitions of partitionedDocVectors on disk so I can load it back? Ningjun
Need Costom RDD
Hi All I have an RDDOjbect then I convert it to RDDObject,Long with ZipWithIndex here Index is Long and its taking 8 bytes Is there any way to make it Integer? There is no API available which INT index. How Can I create Custom RDD so that I takes only 4 bytes for index part? Also why API is design such a way that index of element it gives second part of tuple Regards j
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang zzh...@hortonworks.com wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: External JARs not loading Spark Shell Scala 2.11
H... I don't follow. The 2.11.x series is supposed to be binary compatible against user code. Anyway, I was building Spark against 2.11.2 and still saw the problems with the REPL. I've created a bug report: https://issues.apache.org/jira/browse/SPARK-6989 https://issues.apache.org/jira/browse/SPARK-6989 I hope this helps. Cheers, Michael On Apr 17, 2015, at 1:41 AM, Sean Owen so...@cloudera.com wrote: Doesn't this reduce to Scala isn't compatible with itself across maintenance releases? Meaning, if this were fixed then Scala 2.11.{x 6} would have similar failures. It's not not-ready; it's just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the unofficial support to at least make the latest Scala 2.11 the unbroken one. On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman mich...@videoamp.com wrote: FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here: http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached. Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready for production use. I was going to file a bug, but it seems clear that the current implementation is going to need to be forward-ported to Scala 2.11.6 anyway. We already have an issue for that: https://issues.apache.org/jira/browse/SPARK-6155 Michael On Apr 9, 2015, at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: You will have to go to this commit ID 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are at that commit, you need to review the changes done to the repl code and look for the relevant occurrences of the same code in scala 2.11 repl source and somehow make it all work. Thanks, 1. http://githowto.com/getting_old_versions Prashant Sharma On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos ana...@gmail.com wrote: Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a known issue? I have tried searching around for answers but the only thing I've found that may be related is this: https://issues.apache.org/jira/browse/SPARK-3257 Any/all help is much appreciated. Thanks Alex -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/External-JARs-not-loading-Spark-Shell-Scala-2-11-tp22434.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi Udit, By the way, do you mind to share the whole log trace? Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.usmailto:mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Spark hanging after main method completes
I was using sbt, and I found that I actually had specified Spark 0.9.1 there. Once I upgraded my sbt config file to use 1.3.0, and Scala to 2.10.4, the problem went away. Michael -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-hanging-after-main-method-completes-tp22544p22546.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Why does the HDFS parquet file generated by Spark SQL have different size with those on Tachyon?
It's because you did a repartition -- which rearranges all the data. Parquet uses all kinds of compression techniques such as dictionary encoding and run-length encoding, which would result in the size difference when the data is ordered different. On Fri, Apr 17, 2015 at 4:51 AM, zhangxiongfei zhangxiongfei0...@163.com wrote: Hi, I did some tests on Parquet Files with Spark SQL DataFrame API. I generated 36 gzip compressed parquet files by Spark SQL and stored them on Tachyon,The size of each file is about 222M.Then read them with below code. val tfs =sqlContext.parquetFile(tachyon://datanode8.bitauto.dmp:19998/apps/tachyon/adClick); Next,I just save this DataFrame onto HDFS with below code.It will generate 36 parquet files too,but the size of each file is about 265M tfs.repartition(36).saveAsParquetFile(/user/zhangxf/adClick-parquet-tachyon); My question is Why the files on HDFS has different size with those on Tachyon even though they come from the same original data? Thanks Zhang Xiongfei
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.usmailto:mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
You probably want to first try the basic configuration to see whether it works, instead of setting SPARK_JAR pointing to the hdfs location. This error is caused by not finding ExecutorLauncher in class path, and not HDP specific, I think. Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.usmailto:mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Hi, This is the log trace: https://gist.github.com/uditmehta27/511eac0b76e6d61f8b47 On the yarn RM UI, I see : Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher The command I run is: bin/spark-shell --master yarn-client The spark defaults I use is: spark.yarn.jar hdfs://namenode1-dev.snc1:8020/spark/spark-assembly-1.3.0-hadoop2.4.0.jar spark.yarn.access.namenodes hdfs://namenode1-dev.snc1:8032 spark.dynamicAllocation.enabled false spark.scheduler.mode FAIR spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 Is there anything wrong in what I am trying to do? thanks again! On Fri, Apr 17, 2015 at 2:56 PM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Udit, By the way, do you mind to share the whole log trace? Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta ume...@groupon.com wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta ume...@groupon.com wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang zzh...@hortonworks.com wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Announcing Spark 1.3.1 and 1.2.2
Hi All, I'm happy to announce the Spark 1.3.1 and 1.2.2 maintenance releases. We recommend all users on the 1.3 and 1.2 Spark branches upgrade to these releases, which contain several important bug fixes. Download Spark 1.3.1 or 1.2.2: http://spark.apache.org/downloads.html Release notes: 1.3.1: http://spark.apache.org/releases/spark-release-1-3-1.html 1.2.2: http://spark.apache.org/releases/spark-release-1-2-2.html Comprehensive list of fixes: 1.3.1: http://s.apache.org/spark-1.3.1 1.2.2: http://s.apache.org/spark-1.2.2 Thanks to everyone who worked on these releases! - Patrick - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
How to avoid “Invalid checkpoint directory” error in apache Spark?
I'm using Amazon EMR + S3 as my spark cluster infrastructure. When I'm running a job with periodic checkpointing (it has a long dependency tree, so truncating by checkpointing is mandatory, each checkpoint has 320 partitions). The job stops halfway, resulting an exception: (On driver) org.apache.spark.SparkException: Invalid checkpoint directory: s3n://spooky-checkpoint/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198 at org.apache.spark.rdd.CheckpointRDD.getPartitions(CheckpointRDD.scala:54) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) ... (On Executor) 15/04/17 22:00:14 WARN StorageService: Encountered 4 Internal Server error(s), will retry in 800ms 15/04/17 22:00:15 WARN RestStorageService: Retrying request following error response: PUT '/9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025' -- ResponseCode: 500, ResponseStatus: Internal Server Error ... After manually checking checkpointed files I found that /9e9dbddf-e5d8-478d-9b69-b5b966126d3c/rdd-198/part-00025 is indeed missing on S3. So my question is: if it is missing (perhaps due to AWS malfunction), why didn't spark detect it immediately in the checkpointing process (so it can be retried), instead of throwing an irrecoverable error stating that dependency tree is already lost? And how to avoid this situation from happening again? I don't think this problem is addressed before because HDFS is assumed to be immediately consistent (unlike S3 which is eventually consistent) and extremely resilient. However every component has a chance of breakdown, can you share your best practice of checkpointing? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-avoid-Invalid-checkpoint-directory-error-in-apache-Spark-tp22548.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Can't get SparkListener to work
I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Re: Can't get SparkListener to work
when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Besides the hdp.version in spark-defaults.conf, I think you probably forget to put the file java-opts under $SPARK_HOME/conf with following contents. [root@c6402 conf]# pwd /usr/hdp/current/spark-client/conf [root@c6402 conf]# ls fairscheduler.xml.template java-opts log4j.properties.template metrics.properties.template spark-defaults.conf spark-env.sh hive-site.xml log4j.properties metrics.properties slaves.template spark-defaults.conf.template spark-env.sh.template [root@c6402 conf]# more java-opts -Dhdp.version=2.2.0.0-2041 [root@c6402 conf]# Thanks. Zhan Zhang On Apr 17, 2015, at 3:09 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: Hi, This is the log trace: https://gist.github.com/uditmehta27/511eac0b76e6d61f8b47 On the yarn RM UI, I see : Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher The command I run is: bin/spark-shell --master yarn-client The spark defaults I use is: spark.yarn.jar hdfs://namenode1-dev.snc1:8020/spark/spark-assembly-1.3.0-hadoop2.4.0.jar spark.yarn.access.namenodes hdfs://namenode1-dev.snc1:8032 spark.dynamicAllocation.enabled false spark.scheduler.mode FAIR spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 Is there anything wrong in what I am trying to do? thanks again! On Fri, Apr 17, 2015 at 2:56 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Udit, By the way, do you mind to share the whole log trace? Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.commailto:ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.commailto:zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.commailto:doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.usmailto:mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands,
Re: HDP 2.2 AM abort : Unable to find ExecutorLauncher class
Thanks Zhang, that solved the error. This is probably not documented anywhere so I missed it. Thanks again, Udit On Fri, Apr 17, 2015 at 3:24 PM, Zhan Zhang zzh...@hortonworks.com wrote: Besides the hdp.version in spark-defaults.conf, I think you probably forget to put the file* java-opts* under $SPARK_HOME/conf with following contents. [root@c6402 conf]# pwd /usr/hdp/current/spark-client/conf [root@c6402 conf]# ls fairscheduler.xml.template * java-opts *log4j.properties.template metrics.properties.template spark-defaults.conf spark-env.sh hive-site.xml log4j.properties metrics.properties slaves.template spark-defaults.conf.template spark-env.sh.template *[root@c6402 conf]# more java-opts* * -Dhdp.version=2.2.0.0-2041* [root@c6402 conf]# Thanks. Zhan Zhang On Apr 17, 2015, at 3:09 PM, Udit Mehta ume...@groupon.com wrote: Hi, This is the log trace: https://gist.github.com/uditmehta27/511eac0b76e6d61f8b47 On the yarn RM UI, I see : Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher The command I run is: bin/spark-shell --master yarn-client The spark defaults I use is: spark.yarn.jar hdfs://namenode1-dev.snc1:8020/spark/spark-assembly-1.3.0-hadoop2.4.0.jar spark.yarn.access.namenodes hdfs://namenode1-dev.snc1:8032 spark.dynamicAllocation.enabled false spark.scheduler.mode FAIR spark.driver.extraJavaOptions -Dhdp.version=2.2.0.0-2041 spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 Is there anything wrong in what I am trying to do? thanks again! On Fri, Apr 17, 2015 at 2:56 PM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Udit, By the way, do you mind to share the whole log trace? Thanks. Zhan Zhang On Apr 17, 2015, at 2:26 PM, Udit Mehta ume...@groupon.com wrote: I am just trying to launch a spark shell and not do anything fancy. I got the binary distribution from apache and put the spark assembly on hdfs. I then specified the yarn.jars option in spark defaults to point to the assembly in hdfs. I still got the same error so though I had to build it for hdp. I am using hdp 2.2 with hadoop 2.6/ On Fri, Apr 17, 2015 at 2:21 PM, Udit Mehta ume...@groupon.com wrote: Thanks. Would that distribution work for hdp 2.2? On Fri, Apr 17, 2015 at 2:19 PM, Zhan Zhang zzh...@hortonworks.com wrote: You don’t need to put any yarn assembly in hdfs. The spark assembly jar will include everything. It looks like your package does not include yarn module, although I didn’t find anything wrong in your mvn command. Can you check whether the ExecutorLauncher class is in your jar file or not? BTW: For spark-1.3, you can use the binary distribution from apache. Thanks. Zhan Zhang On Apr 17, 2015, at 2:01 PM, Udit Mehta ume...@groupon.com wrote: I followed the steps described above and I still get this error: Error: Could not find or load main class org.apache.spark.deploy.yarn.ExecutorLauncher I am trying to build spark 1.3 on hdp 2.2. I built spark from source using: build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -DskipTests package Maybe I am not putting the correct yarn assembly on hdfs or some other issue? Thanks, Udit On Mon, Mar 30, 2015 at 10:18 AM, Zhan Zhang zzh...@hortonworks.com wrote: Hi Folks, Just to summarize it to run SPARK on HDP distribution. 1. The spark version has to be 1.3.0 and above if you are using upstream distribution. This configuration is mainly for HDP rolling upgrade purpose, and the patch only went into spark upstream from 1.3.0. 2. In $SPARK_HOME/conf/sp[ark-defaults.conf, adding following settings. spark.driver.extraJavaOptions -Dhdp.version=x spark.yarn.am.extraJavaOptions -Dhdp.version=x 3. In $SPARK_HOME/java-opts, add following options. -Dhdp.version=x Thanks. Zhan Zhang On Mar 30, 2015, at 6:56 AM, Doug Balog doug.sparku...@dugos.com wrote: The “best” solution to spark-shell’s problem is creating a file $SPARK_HOME/conf/java-opts with “-Dhdp.version=2.2.0.0-2014” Cheers, Doug On Mar 28, 2015, at 1:25 PM, Michael Stone mst...@mathom.us wrote: I've also been having trouble running 1.3.0 on HDP. The spark.yarn.am.extraJavaOptions -Dhdp.version=2.2.0.0-2041 configuration directive seems to work with pyspark, but not propagate when using spark-shell. (That is, everything works find with pyspark, and spark-shell fails with the bad substitution message.) Mike Stone - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: ClassCastException processing date fields using spark SQL since 1.3.0
Thank you for the explanation! I’ll check what can be done here. From: Krist Rastislav [mailto:rkr...@vub.sk] Sent: Friday, April 17, 2015 9:03 PM To: Wang, Daoyuan; Michael Armbrust Cc: user Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 So finally, org.apache.spark.sql.catalyst.ScalaReflection#convertToCatalyst was the method I was looking for (this is the way how it is being done with case classes at least, so it should be good for me too ;-)) My problem is thus solved... Someone should put that method also in JdbcRDD to make it work again. Sorry for spamming you ;-) Thank You very much, best regards R.Krist From: Krist Rastislav Sent: Friday, April 17, 2015 11:57 AM To: 'Wang, Daoyuan'; 'Michael Armbrust' Cc: 'user' Subject: RE: ClassCastException processing date fields using spark SQL since 1.3.0 Hello again, steps to reproduce the same problem in JdbcRDD: - create a table containig Date field in your favourite DBMS, I used PostgreSQL: CREATE TABLE spark_test ( pk_spark_test integer NOT NULL, text character varying(25), date1 date, CONSTRAINT pk PRIMARY KEY (pk_spark_test) ) WITH ( OIDS=FALSE ); ALTER TABLE spark_test OWNER TO postgres; GRANT ALL ON TABLE spark_test TO postgres; GRANT ALL ON TABLE spark_test TO public; - fill it with data: insert into spark_test(pk_spark_test, text, date1) values (1, 'one', '2014-04-01') insert into spark_test(pk_spark_test, text, date1) values (2, 'two', '2014-04-02') - from scala REPL, try the following: import org.apache.spark.sql.SQLContext val sqc = new SQLContext(sc) sqc.jdbc(jdbc:postgresql://localhost:5432/ebx_repository?schema=ebx_repositoryuser=abcpassword=def, spark_test).cache.registerTempTable(spark_test) // don’t forget the cache method sqc.sql(select * from spark_test).foreach(println) the last command will produce the following error (if you don’t use cache, it will produce correct results as expected): 11:50:27.306 [Executor task launch worker-0] ERROR org.apache.spark.executor.Executor - Exception in task 0.0 in stage 0.0 (TID 0) java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableAny cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableInt at org.apache.spark.sql.catalyst.expressions.SpecificMutableRow.getInt(SpecificMutableRow.scala:248) ~[spark-catalyst_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.IntColumnStats.gatherStats(ColumnStats.scala:191) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NullableColumnBuilder$class.appendFrom(NullableColumnBuilder.scala:56) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.org$apache$spark$sql$columnar$compression$CompressibleColumnBuilder$$super$appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.compression.CompressibleColumnBuilder$class.appendFrom(CompressibleColumnBuilder.scala:78) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.NativeColumnBuilder.appendFrom(ColumnBuilder.scala:87) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:135) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$3$$anon$1.next(InMemoryColumnarTableScan.scala:111) ~[spark-sql_2.11-1.3.0.jar:1.3.0] at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:249) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) ~[spark-core_2.11-1.3.0.jar:1.3.0] at org.apache.spark.scheduler.Task.run(Task.scala:64) ~[spark-core_2.11-1.3.0.jar:1.3.0] at
Re: External JARs not loading Spark Shell Scala 2.11
You are running on 2.11.6, right? of course, it seems like that should all work, but it doesn't work for you. My point is that the shell you are saying doesn't work is Scala's 2.11.2 shell -- with some light modification. It's possible that the delta is the problem. I can't entirely make out whether the errors are Spark-specific; they involve Spark classes in some cases but they're assertion errors from Scala libraries. I don't know if this shell is supposed to work even across maintenance releases as-is, though that would be very nice. It's not an API for Scala. A good test of whether this idea has any merit would be to run with Scala 2.11.2. I'll copy this to JIRA for continuation. On Fri, Apr 17, 2015 at 10:31 PM, Michael Allman mich...@videoamp.com wrote: H... I don't follow. The 2.11.x series is supposed to be binary compatible against user code. Anyway, I was building Spark against 2.11.2 and still saw the problems with the REPL. I've created a bug report: https://issues.apache.org/jira/browse/SPARK-6989 I hope this helps. Cheers, Michael On Apr 17, 2015, at 1:41 AM, Sean Owen so...@cloudera.com wrote: Doesn't this reduce to Scala isn't compatible with itself across maintenance releases? Meaning, if this were fixed then Scala 2.11.{x 6} would have similar failures. It's not not-ready; it's just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the unofficial support to at least make the latest Scala 2.11 the unbroken one. On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman mich...@videoamp.com wrote: FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here: http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached. Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready for production use. I was going to file a bug, but it seems clear that the current implementation is going to need to be forward-ported to Scala 2.11.6 anyway. We already have an issue for that: https://issues.apache.org/jira/browse/SPARK-6155 Michael On Apr 9, 2015, at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: You will have to go to this commit ID 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are at that commit, you need to review the changes done to the repl code and look for the relevant occurrences of the same code in scala 2.11 repl source and somehow make it all work. Thanks, 1. http://githowto.com/getting_old_versions Prashant Sharma On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos ana...@gmail.com wrote: Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the 1.3.0 Spark shell to find an external jar. I have build Spark locally for Scala 2.11 and I am starting the REPL as follows: bin/spark-shell --master yarn --jars data-api-es-data-export-4.0.0.jar I see the following line in the console output: 15/04/09 09:52:15 INFO spark.SparkContext: Added JAR file:/opt/spark/spark-1.3.0_2.11-hadoop2.3/data-api-es-data-export-4.0.0.jar at http://192.168.115.31:54421/jars/data-api-es-data-export-4.0.0.jar with timestamp 1428569535904 but when i try to import anything from this jar, it's simply not available. When I try to add the jar manually using the command :cp /path/to/jar the classes in the jar are still unavailable. I understand that 2.11 is not officially supported, but has anyone been able to get an external jar loaded in the 1.3.0 release? Is this a
Re: External JARs not loading Spark Shell Scala 2.11
I actually just saw your comment on SPARK-6989 before this message. So I'll copy to the mailing list: I'm not sure I understand what you mean about running on 2.11.6. I'm just running the spark-shell command. It in turn is running java -cp /opt/spark/conf:/opt/spark/lib/spark-assembly-1.3.2-SNAPSHOT-hadoop2.5.0-cdh5.3.3.jar:/etc/hadoop/conf:/opt/spark/lib/jline-2.12.jar \ -Dscala.usejavacp=true -Xms512m -Xmx512m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main spark-shell I built Spark with the included build/mvn script. As far as I can tell, the only reference to a specific version of Scala is in the top-level pom file, and it says 2.11.2. On Apr 17, 2015, at 9:57 PM, Sean Owen so...@cloudera.com wrote: You are running on 2.11.6, right? of course, it seems like that should all work, but it doesn't work for you. My point is that the shell you are saying doesn't work is Scala's 2.11.2 shell -- with some light modification. It's possible that the delta is the problem. I can't entirely make out whether the errors are Spark-specific; they involve Spark classes in some cases but they're assertion errors from Scala libraries. I don't know if this shell is supposed to work even across maintenance releases as-is, though that would be very nice. It's not an API for Scala. A good test of whether this idea has any merit would be to run with Scala 2.11.2. I'll copy this to JIRA for continuation. On Fri, Apr 17, 2015 at 10:31 PM, Michael Allman mich...@videoamp.com wrote: H... I don't follow. The 2.11.x series is supposed to be binary compatible against user code. Anyway, I was building Spark against 2.11.2 and still saw the problems with the REPL. I've created a bug report: https://issues.apache.org/jira/browse/SPARK-6989 I hope this helps. Cheers, Michael On Apr 17, 2015, at 1:41 AM, Sean Owen so...@cloudera.com wrote: Doesn't this reduce to Scala isn't compatible with itself across maintenance releases? Meaning, if this were fixed then Scala 2.11.{x 6} would have similar failures. It's not not-ready; it's just not the Scala 2.11.6 REPL. Still, sure I'd favor breaking the unofficial support to at least make the latest Scala 2.11 the unbroken one. On Fri, Apr 17, 2015 at 7:58 AM, Michael Allman mich...@videoamp.com wrote: FWIW, this is an essential feature to our use of Spark, and I'm surprised it's not advertised clearly as a limitation in the documentation. All I've found about running Spark 1.3 on 2.11 is here: http://spark.apache.org/docs/latest/building-spark.html#building-for-scala-211 Also, I'm experiencing some serious stability problems simply trying to run the Spark 1.3 Scala 2.11 REPL. Most of the time it fails to load and spews a torrent of compiler assertion failures, etc. See attached. Unfortunately, it appears the Spark 1.3 Scala 2.11 REPL is simply not ready for production use. I was going to file a bug, but it seems clear that the current implementation is going to need to be forward-ported to Scala 2.11.6 anyway. We already have an issue for that: https://issues.apache.org/jira/browse/SPARK-6155 Michael On Apr 9, 2015, at 10:29 PM, Prashant Sharma scrapco...@gmail.com wrote: You will have to go to this commit ID 191d7cf2a655d032f160b9fa181730364681d0e7 in Apache spark. [1] Once you are at that commit, you need to review the changes done to the repl code and look for the relevant occurrences of the same code in scala 2.11 repl source and somehow make it all work. Thanks, 1. http://githowto.com/getting_old_versions Prashant Sharma On Thu, Apr 9, 2015 at 4:40 PM, Alex Nakos ana...@gmail.com wrote: Ok, what do i need to do in order to migrate the patch? Thanks Alex On Thu, Apr 9, 2015 at 11:54 AM, Prashant Sharma scrapco...@gmail.com wrote: This is the jira I referred to https://issues.apache.org/jira/browse/SPARK-3256. Another reason for not working on it is evaluating priority between upgrading to scala 2.11.5(it is non trivial I suppose because repl has changed a bit) or migrating that patch is much simpler. Prashant Sharma On Thu, Apr 9, 2015 at 4:16 PM, Alex Nakos ana...@gmail.com wrote: Hi- Was this the JIRA issue? https://issues.apache.org/jira/browse/SPARK-2988 Any help in getting this working would be much appreciated! Thanks Alex On Thu, Apr 9, 2015 at 11:32 AM, Prashant Sharma scrapco...@gmail.com wrote: You are right this needs to be done. I can work on it soon, I was not sure if there is any one even using scala 2.11 spark repl. Actually there is a patch in scala 2.10 shell to support adding jars (Lost the JIRA ID), which has to be ported for scala 2.11 too. If however, you(or anyone else) are planning to work, I can help you ? Prashant Sharma On Thu, Apr 9, 2015 at 3:08 PM, anakos ana...@gmail.com wrote: Hi- I am having difficulty getting the
local directories for spark running on yarn
According to the documentation: The local directories used by Spark executors will be the local directories configured for YARN (Hadoop YARN config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. (https://spark.apache.org/docs/1.2.1/running-on-yarn.html) If spark.local.dir is specified, the yarn local directory will be ignored, right? It's a little ambiguous to me. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/local-directories-for-spark-running-on-yarn-tp22543.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't get SparkListener to work
Thanks for the response, Imran. I probably chose the wrong methods for this email. I implemented all methods of SparkListener and the only callback I get is onExecutorMetricsUpdate. Here's the complete code: == import org.apache.spark.scheduler._ sc.addSparkListener(new SparkListener() { override def onStageCompleted(e: SparkListenerStageCompleted) = println( onStageCompleted); override def onStageSubmitted(e: SparkListenerStageSubmitted) = println( onStageSubmitted); override def onTaskStart(e: SparkListenerTaskStart) = println( onTaskStart); override def onTaskGettingResult(e: SparkListenerTaskGettingResult) = println( onTaskGettingResult); override def onTaskEnd(e: SparkListenerTaskEnd) = println( onTaskEnd); override def onJobStart(e: SparkListenerJobStart) = println( onJobStart); override def onJobEnd(e: SparkListenerJobEnd) = println( onJobEnd); override def onEnvironmentUpdate(e: SparkListenerEnvironmentUpdate) = println( onEnvironmentUpdate); override def onBlockManagerAdded(e: SparkListenerBlockManagerAdded) = println( onBlockManagerAdded); override def onBlockManagerRemoved(e: SparkListenerBlockManagerRemoved) = println( onBlockManagerRemoved); override def onUnpersistRDD(e: SparkListenerUnpersistRDD) = println( onUnpersistRDD); override def onApplicationStart(e: SparkListenerApplicationStart) = println( onApplicationStart); override def onApplicationEnd(e: SparkListenerApplicationEnd) = println( onApplicationEnd); override def onExecutorMetricsUpdate(e: SparkListenerExecutorMetricsUpdate) = println( onExecutorMetricsUpdate); }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); = On Fri, Apr 17, 2015 at 4:13 PM, Imran Rashid iras...@cloudera.com wrote: when you start the spark-shell, its already too late to get the ApplicationStart event. Try listening for StageCompleted or JobEnd instead. On Fri, Apr 17, 2015 at 5:54 PM, Praveen Balaji secondorderpolynom...@gmail.com wrote: I'm trying to create a simple SparkListener to get notified of error on executors. I do not get any call backs on my SparkListener. Here some simple code I'm executing in spark-shell. But I still don't get any callbacks on my listener. Am I doing something wrong? Thanks for any clue you can send my way. Cheers Praveen == import org.apache.spark.scheduler.SparkListener import org.apache.spark.scheduler.SparkListenerApplicationStart import org.apache.spark.scheduler.SparkListenerApplicationEnd import org.apache.spark.SparkException sc.addSparkListener(new SparkListener() { override def onApplicationStart(applicationStart: SparkListenerApplicationStart) { println( onApplicationStart: + applicationStart.appName); } override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd) { println( onApplicationEnd: + applicationEnd.time); } }); sc.parallelize(List(1, 2, 3)).map(throw new SparkException(test)).collect(); === output: scala org.apache.spark.SparkException: hshsh at $iwC$$iwC$$iwC$$iwC.init(console:29) at $iwC$$iwC$$iwC.init(console:34) at $iwC$$iwC.init(console:36) at $iwC.init(console:38)