Spark DataFrame sum of multiple columns
Hi, Do we have any way to perform Row level operations in spark dataframes. For example, I have a dataframe with columns from A,B,C,...Z.. I want to add one more column New Column with sum of all column values. A B C D . . . Z New Column 1 2 4 3 26 351 Can somebody help me on this? Thanks, Naveen
reading EOF exception while reading parquet ile from hadoop
Hi, I am trying to read parquet file(for ex: one.parquet) I am creating rdd out of it like .. My program In scala like below... val data = sqlContext.read.parquet("hdfs://machine:port/home/user/one.parquet").rdd.map { x => (x.getString(0),x) } data.count() I am using spark 1.4 and Hadoop 2.4 java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at parquet.hadoop.ParquetInputSplit.readArray(ParquetInputSplit.java:240) at parquet.hadoop.ParquetInputSplit.readUTF8(ParquetInputSplit.java:230) at parquet.hadoop.ParquetInputSplit.readFields(ParquetInputSplit.java:197) at org.apache.hadoop.io.ObjectWritable.readObject(ObjectWritable.java:285) at org.apache.hadoop.io.ObjectWritable.readFields(ObjectWritable.java:77) at org.apache.spark.SerializableWritable$$anonfun$readObject$1.apply$mcV$sp(SerializableWritable.scala:45) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1239) at org.apache.spark.SerializableWritable.readObject(SerializableWritable.scala:41) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:69) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:95) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Thanks, Naveen Kumar Pokala [cid:image001.jpg@01D19B26.32EE0FE0]
Standard deviation on multiple columns
Hi, I am using spark 1.6.0 I want to find standard deviation of columns that will come dynamically. val stdDevOnAll = columnNames.map { x => stddev(x } causalDf.groupBy(causalDf("A"),causalDf("B"),causalDf("C")) .agg(stdDevOnAll:_*) //error line I am trying to do as above. But it is giving me compilation error as below. overloaded method value agg with alternatives: (expr: org.apache.spark.sql.Column,exprs: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame (exprs: java.util.Map[String,String])org.apache.spark.sql.DataFrame (exprs: scala.collection.immutable.Map[String,String])org.apache.spark.sql.DataFrame (aggExpr: (String, String),aggExprs: (String, String)*)org.apache.spark.sql.DataFrame cannot be applied to (org.apache.spark.sql.Column) Naveen
Failed to locate the winutils binary in the hadoop binary path
Hi, I am facing the following issue when I am connecting from spark-shell. Please tell me how to avoid it. 15/01/29 17:21:27 ERROR Shell: Failed to locate the winutils binary in the hadoop binary path java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333) at org.apache.hadoop.util.Shell.(Shell.java:326) at org.apache.hadoop.util.StringUtils.(StringUtils.java:76) at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93) at org.apache.hadoop.security.Groups.(Groups.java:77) at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240) at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255) at org.apache.hadoop.security.UserGroupInformation.setConfiguration(UserGroupInformation.java:283) at org.apache.spark.deploy.SparkHadoopUtil.(SparkHadoopUtil.scala:36) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala:109) at org.apache.spark.deploy.SparkHadoopUtil$.(SparkHadoopUtil.scala) at org.apache.spark.SparkContext.(SparkContext.scala:228) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:972) at $line3.$read$$iwC$$iwC.(:8) at $line3.$read$$iwC.(:14) at $line3.$read.(:16) at $line3.$read$.(:20) at $line3.$read$.() at $line3.$eval$.(:7) at $line3.$eval$.() at $line3.$eval.$print() at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:121) at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:120) at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:264) at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:120) at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:56) at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:931) at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:142) at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:56) at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:104) at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:56) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:948) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) at org.apache.spark.repl.Main$.main(Main.scala:31) at org.apache.spark.repl.Main.main(Main.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 15/01/29 17:21:28 INFO Executor: Using REPL class URI: http://172.22.5.79:60436 15/01/29 17:21:28 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkdri...@ii01-hdhlg32.ciqhyd.com:60464/user/HeartbeatReceiver 15/
Pyspark Interactive shell
Hi, Anybody tried to connect to spark cluster( on UNIX machines) from windows interactive shell ? -Naveen.
Re: pyspark.daemon not found
Hi, I am receiving the following error when I am trying to connect spark cluster( which is on unix) from my windows machine using pyspark interactive shell > pyspark -master (spark cluster url) Then I executed the following commands. >>>>lines = sc.textFile("hdfs://master/data/spark/SINGLE.TXT") >>>>lineLengths = lines.map(lambda s: len(s)) >>>>totalLength = lineLengths.reduce(lambda a, b: a + b) I got the following Error 14/12/31 16:20:15 INFO DAGScheduler: Job 0 failed: reduce at :1, took 6.960438 s Traceback (most recent call last): File "", line 1, in File "C:\Users\npokala\Downloads\spark-master\python\pyspark\rdd.py", line 715, in reduce vals = self.mapPartitions(func).collect() File "C:\Users\npokala\Downloads\spark-master\python\pyspark\rdd.py", line 676, in collect bytesInJava = self._jrdd.collect().iterator() File "C:\Users\npokala\Downloads\spark-master\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 538, in __call__ File "C:\Users\npokala\Downloads\spark-master\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o23.collect. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 7, Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-ins java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:163) at org.apache.spark.api.python.PythonWorkerFactory.createThroughDaemon(PythonWorkerFactory.scala:86) at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:62) at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:102) at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:70) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:265) at org.apache.spark.rdd.RDD.iterator(RDD.scala:232) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Please help me to resolve this issue. -Naveen From: Naveen Kumar Pokala [mailto:npok...@spcapitaliq.com] Sent: Wednesday, December 31,
pyspark.daemon not found
Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python: Please can somebody help me on this, how to resolve the issue. -Naveen
python: module pyspark.daemon not found
14/12/29 18:10:56 INFO TaskSetManager: Starting task 0.1 in stage 0.0 (TID 2, nj09mhf0730.mhf.mhc, PROCESS_LOCAL, 1246 bytes) 14/12/29 18:10:56 INFO TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1) on executor nj09mhf0730.mhf.mhc: org.apache.spark.SparkException ( Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar:/home/npokala/data/spark-install/spark-master/sbin/../python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/sbin/../python: java.io.EOFException) [duplicate 1] 14/12/29 18:10:56 INFO TaskSetManager: Starting task 1.1 in stage 0.0 (TID 3, nj09mhf0731.mhf.mhc, PROCESS_LOCAL, 1246 bytes) 14/12/29 18:10:56 INFO TaskSetManager: Lost task 0.1 in stage 0.0 (TID 2) on executor nj09mhf0730.mhf.mhc: org.apache.spark.SparkException ( Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar:/home/npokala/data/spark-install/spark-master/sbin/../python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/sbin/../python: java.io.EOFException) [duplicate 2] 14/12/29 18:10:56 INFO TaskSetManager: Starting task 0.2 in stage 0.0 (TID 4, nj09mhf0731.mhf.mhc, PROCESS_LOCAL, 1246 bytes) 14/12/29 18:10:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on nj09mhf0731.mhf.mhc:48802 (size: 3.4 KB, free: 265.1 MB) 14/12/29 18:10:58 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on nj09mhf0731.mhf.mhc:41243 (size: 3.4 KB, free: 265.1 MB) 14/12/29 18:10:59 INFO TaskSetManager: Lost task 1.1 in stage 0.0 (TID 3) on executor nj09mhf0731.mhf.mhc: org.apache.spark.SparkException ( Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar:/home/npokala/data/spark-install/spark-master/sbin/../python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/sbin/../python: java.io.EOFException) [duplicate 3] 14/12/29 18:10:59 INFO TaskSetManager: Starting task 1.2 in stage 0.0 (TID 5, nj09mhf0730.mhf.mhc, PROCESS_LOCAL, 1246 bytes) 14/12/29 18:10:59 INFO TaskSetManager: Lost task 0.2 in stage 0.0 (TID 4) on executor nj09mhf0731.mhf.mhc: org.apache.spark.SparkException ( Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar:/home/npokala/data/spark-install/spark-master/sbin/../python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/sbin/../python: java.io.EOFException) [duplicate 4] 14/12/29 18:10:59 INFO TaskSetManager: Starting task 0.3 in stage 0.0 (TID 6, nj09mhf0730.mhf.mhc, PROCESS_LOCAL, 1246 bytes) 14/12/29 18:11:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on nj09mhf0730.mhf.mhc:60005 (size: 3.4 KB, free: 265.1 MB) 14/12/29 18:11:00 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on nj09mhf0730.mhf.mhc:40227 (size: 3.4 KB, free: 265.1 MB) 14/12/29 18:11:01 INFO TaskSetManager: Lost task 0.3 in stage 0.0 (TID 6) on executor nj09mhf0730.mhf.mhc: org.apache.spark.SparkException ( Error from python worker: python: module pyspark.daemon not found PYTHONPATH was: /home/npokala/data/spark-install/spark-master/python:/home/npokala/data/spark-install/spark-master/python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/assembly/target/scala-2.10/spark-assembly-1.3.0-SNAPSHOT-hadoop2.4.0.jar:/home/npokala/data/spark-install/spark-master/sbin/../python/lib/py4j-0.8.2.1-src.zip:/home/npokala/data/spark-install/spark-master/sbin/../python: java.io.EOFException) [duplicate 5] 14/12/29 18:11:01 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job 14/12/29 18:11:01 INFO TaskSchedulerImpl: Cancelling stage 0 14/12/29 18:11:01 INFO TaskSchedulerImpl: Stage 0 was cancelled 14/12/29 18:11:01 INFO DAGScheduler: Job 0 failed: reduce at D:\WorkSpace\python\spark\src\test\__init__.py:21, took 15.491196 s Traceback (most recent call last): File "D:\WorkSpace\python\spark\src\test\__init__.py", line 21, in count = sc.parallelize(xrange(1, n + 1), partit
RE: Spark Job submit
Code is in my windows machine and cluster is in some other network in UNIX. In this case how it will identify the cluster. In case of spark cluster we can clearly specify the URL like spark://ip:port. But in case of hadoop how to specify that. What I have done is copied the hadoop configuration files from network to local and created dummy hadoop directory(in windows machine). Submitted from spark submit by adding above dummy files location with HADOOP_CONF_DIR variable. Attaching the error. [cid:image001.png@01D00A3D.141E3070] Please suggest me how to proceed from the code and how to execute from spark submit from windows machine. Please provide me sample code if you have any. -Naveen From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, November 26, 2014 10:03 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org Subject: Re: Spark Job submit How about? - Create a SparkContext - setMaster as yarn-cluster - Create a JavaSparkContext with the above SparkContext And that will submit it to the yarn cluster. Thanks Best Regards On Wed, Nov 26, 2014 at 4:20 PM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: Hi. Is there a way to submit spark job on Hadoop-YARN cluster from java code. -Naveen
Spark Job submit
Hi. Is there a way to submit spark job on Hadoop-YARN cluster from java code. -Naveen
RE: Control number of parquet generated from JavaSchemaRDD
Hi, While submitting your spark job mention --executor-cores 2 --num-executors 24 it will divide the dataset into 24*2 parquet files. Or set spark.default.parallelism value like 50 on sparkconf object. It will divide the dataset into 50 files into your HDFS. -Naveen -Original Message- From: tridib [mailto:tridib.sama...@live.com] Sent: Tuesday, November 25, 2014 9:54 AM To: u...@spark.incubator.apache.org Subject: Control number of parquet generated from JavaSchemaRDD Hello, I am reading around 1000 input files from disk in an RDD and generating parquet. It always produces same number of parquet files as number of input files. I tried to merge them using rdd.coalesce(n) and/or rdd.repatition(n). also tried using: int MB_128 = 128*1024*1024; sc.hadoopConfiguration().setInt("dfs.blocksize", MB_128); sc.hadoopConfiguration().setInt("parquet.block.size", MB_128); No luck. Is there a way to control the size/number of parquet files generated? Thanks Tridib -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Control-number-of-parquet-generated-from-JavaSchemaRDD-tp19717.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Submit Spark driver on Yarn Cluster in client mode
Hi Akhil, But driver and yarn both are in different networks, How to specify (export HADOOP_CONF_DIR=XXX) path. Like driver is from my windows machine and yarn is some unix machine on different network. -Naveen. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Monday, November 24, 2014 4:08 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Submit Spark driver on Yarn Cluster in client mode You can export the hadoop configurations dir (export HADOOP_CONF_DIR=XXX) in the environment and then submit it like: ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ # can also be `yarn-client` for client mode --executor-memory 20G \ --num-executors 50 \ /path/to/examples.jar \ 1000 More details over here https://spark.apache.org/docs/1.1.0/submitting-applications.html#launching-applications-with-spark-submit Thanks Best Regards On Mon, Nov 24, 2014 at 4:01 PM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: Hi, I want to submit my spark program from my machine on a YARN Cluster in yarn client mode. How to specify al l the required details through SPARK submitter. Please provide me some details. -Naveen.
Submit Spark driver on Yarn Cluster in client mode
Hi, I want to submit my spark program from my machine on a YARN Cluster in yarn client mode. How to specify al l the required details through SPARK submitter. Please provide me some details. -Naveen.
Execute Spark programs from local machine on Yarn-hadoop cluster
Hi, I am executing my spark jobs on yarn cluster by forming conf object in the following way. SparkConf conf = new SparkConf().setAppName("NewJob").setMaster("yarn-cluster"); Now I want to execute spark jobs from my local machine how to do that. What I mean is there a way to give IP address, port all the details to connect a master(YARN) on some other network from my local spark Program. -Naveen
RE: Null pointer exception with larger datasets
Thanks Akhil. -Naveen. From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Tuesday, November 18, 2014 1:19 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org Subject: Re: Null pointer exception with larger datasets Make sure your list is not null, if that is null then its more like doing: JavaRDD distData = sc.parallelize(null) distData.foreach(println) Thanks Best Regards On Tue, Nov 18, 2014 at 12:07 PM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: Hi, I am having list Students and size is one Lakh and I am trying to save the file. It is throwing null pointer exception. JavaRDD distData = sc.parallelize(list); distData.saveAsTextFile("hdfs://master/data/spark/instruments.txt"); 14/11/18 01:33:21 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, master): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) How to handle this? -Naveen
Null pointer exception with larger datasets
Hi, I am having list Students and size is one Lakh and I am trying to save the file. It is throwing null pointer exception. JavaRDD distData = sc.parallelize(list); distData.saveAsTextFile("hdfs://master/data/spark/instruments.txt"); 14/11/18 01:33:21 WARN scheduler.TaskSetManager: Lost task 5.0 in stage 0.0 (TID 5, master): java.lang.NullPointerException: org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1.apply(RDD.scala:1158) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:984) org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:974) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) How to handle this? -Naveen
HDFS read text file
Hi, JavaRDD studentsData = sc.parallelize(list);--list is Student Info List studentsData.saveAsTextFile("hdfs://master/data/spark/instruments.txt"); above statements saved the students information in the HDFS as a text file. Each object is a line in text file as below. [cid:image001.png@01D0027F.FB321550] How to read that file, I mean each line as Object of student. -Naveen
Spark GCLIB error
Hi, I am receiving following error when I am trying to run sample spark program. Caused by: java.lang.UnsatisfiedLinkError: /tmp/hadoop-npokala/nm-local-dir/usercache/npokala/appcache/application_1415881017544_0003/container_1415881017544_0003_01_01/tmp/snappy-1.0.5.3-ec0ee911-2f8c-44c0-ae41-5e0b2727d880-libsnappyjava.so: /usr/lib64/libstdc++.so.6: version `GLIBCXX_3.4.9' not found (required by /tmp/hadoop-npokala/nm-local-dir/usercache/npokala/appcache/application_1415881017544_0003/container_1415881017544_0003_01_01/tmp/snappy-1.0.5.3-ec0ee911-2f8c-44c0-ae41-5e0b2727d880-libsnappyjava.so) at java.lang.ClassLoader$NativeLibrary.load(Native Method) at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1929) at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1814) at java.lang.Runtime.load0(Runtime.java:809) at java.lang.System.load(System.java:1083) at org.xerial.snappy.SnappyNativeLoader.load(SnappyNativeLoader.java:39) ... 29 more -Naveen.
Snappy error with Spark SQL
HI, I am facing the following problem when I am trying to save my RDD as parquet File. 14/11/12 07:43:59 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 1.0 (TID 48,): org.xerial.snappy.SnappyError: [FAILED_TO_LOAD_NATIVE_LIBRARY] null org.xerial.snappy.SnappyLoader.load(SnappyLoader.java:236) org.xerial.snappy.Snappy.(Snappy.java:48) parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:64) org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:109) parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:70) parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:199) parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:108) parquet.hadoop.InternalParquetRecordWriter.flushStore(InternalParquetRecordWriter.java:146) parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:110) parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:305) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) 14/11/12 07:43:59 WARN scheduler.TaskSetManager: Lost task 3.0 in stage 1.0 (TID 51,): java.lang.NoClassDefFoundError: Could not initialize class org.xerial.snappy.Snappy parquet.hadoop.codec.SnappyCompressor.compress(SnappyCompressor.java:64) org.apache.hadoop.io.compress.CompressorStream.compress(CompressorStream.java:81) org.apache.hadoop.io.compress.CompressorStream.finish(CompressorStream.java:92) parquet.hadoop.CodecFactory$BytesCompressor.compress(CodecFactory.java:109) parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writePage(ColumnChunkPageWriteStore.java:70) parquet.column.impl.ColumnWriterImpl.writePage(ColumnWriterImpl.java:119) parquet.column.impl.ColumnWriterImpl.flush(ColumnWriterImpl.java:199) parquet.column.impl.ColumnWriteStoreImpl.flush(ColumnWriteStoreImpl.java:108) parquet.hadoop.InternalParquetRecordWriter.flushStore(InternalParquetRecordWriter.java:146) parquet.hadoop.InternalParquetRecordWriter.close(InternalParquetRecordWriter.java:110) parquet.hadoop.ParquetRecordWriter.close(ParquetRecordWriter.java:73) org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:305) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:318) org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) java.lang.Thread.run(Thread.java:745) Please help me. Regards, Naveen.
RE: Spark SQL configurations
Thanks Akhil. -Naveen From: Akhil Das [mailto:ak...@sigmoidanalytics.com] Sent: Wednesday, November 12, 2014 6:38 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org Subject: Re: Spark SQL configurations JavaSQLContext.sqlContext.setConf is available. Thanks Best Regards On Wed, Nov 12, 2014 at 5:14 PM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: [cid:image001.png@01CFFEB0.6E4AD190] Hi, How to set the above properties on JavaSQLContext. I am not able to see setConf method on JavaSQLContext Object. I have added spark core jar and spark assembly jar to my build path. And I am using spark 1.1.0 and hadoop 2.4.0 --Naveen
Spark SQL configurations
[cid:image001.png@01CFFE9C.25904980] Hi, How to set the above properties on JavaSQLContext. I am not able to see setConf method on JavaSQLContext Object. I have added spark core jar and spark assembly jar to my build path. And I am using spark 1.1.0 and hadoop 2.4.0 --Naveen
RE: scala.MatchError
Hi, Do you mean with java, I shouldn’t have Issue class as a property (attribute) in Instrument Class? Ex : Class Issue { Int a; } Class Instrument { Issue issue; } How about scala? Does it support such user defined datatypes in classes Case class Issue . case class Issue( a:Int = 0) case class Instrument(issue: Issue = null) -Naveen From: Michael Armbrust [mailto:mich...@databricks.com] Sent: Wednesday, November 12, 2014 12:09 AM To: Xiangrui Meng Cc: Naveen Kumar Pokala; user@spark.apache.org Subject: Re: scala.MatchError Xiangrui is correct that is must be a java bean, also nested classes are not yet supported in java. On Tue, Nov 11, 2014 at 10:11 AM, Xiangrui Meng mailto:men...@gmail.com>> wrote: I think you need a Java bean class instead of a normal class. See example here: http://spark.apache.org/docs/1.1.0/sql-programming-guide.html (switch to the java tab). -Xiangrui On Tue, Nov 11, 2014 at 7:18 AM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: > Hi, > > > > This is my Instrument java constructor. > > > > public Instrument(Issue issue, Issuer issuer, Issuing issuing) { > > super(); > > this.issue = issue; > > this.issuer = issuer; > > this.issuing = issuing; > > } > > > > > > I am trying to create javaschemaRDD > > > > JavaSchemaRDD schemaInstruments = sqlCtx.applySchema(distData, > Instrument.class); > > > > Remarks: > > > > > > Instrument, Issue, Issuer, Issuing all are java classes > > > > distData is holding List< Instrument > > > > > > > I am getting the following error. > > > > > > > > Exception in thread "Driver" java.lang.reflect.InvocationTargetException > > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > > at java.lang.reflect.Method.invoke(Method.java:483) > > at > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) > > Caused by: scala.MatchError: class sample.spark.test.Issue (of class > java.lang.Class) > > at > org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189) > > at > org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) > > at > scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) > > at > scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) > > at > scala.collection.TraversableLike$class.map(TraversableLike.scala:244) > > at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) > > at > org.apache.spark.sql.api.java.JavaSQLContext.getSchema(JavaSQLContext.scala:188) > > at > org.apache.spark.sql.api.java.JavaSQLContext.applySchema(JavaSQLContext.scala:90) > > at sample.spark.test.SparkJob.main(SparkJob.java:33) > > ... 5 more > > > > Please help me. > > > > Regards, > > Naveen. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org> For additional commands, e-mail: user-h...@spark.apache.org<mailto:user-h...@spark.apache.org>
scala.MatchError
Hi, This is my Instrument java constructor. public Instrument(Issue issue, Issuer issuer, Issuing issuing) { super(); this.issue = issue; this.issuer = issuer; this.issuing = issuing; } I am trying to create javaschemaRDD JavaSchemaRDD schemaInstruments = sqlCtx.applySchema(distData, Instrument.class); Remarks: Instrument, Issue, Issuer, Issuing all are java classes distData is holding List< Instrument > I am getting the following error. Exception in thread "Driver" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162) Caused by: scala.MatchError: class sample.spark.test.Issue (of class java.lang.Class) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:189) at org.apache.spark.sql.api.java.JavaSQLContext$$anonfun$getSchema$1.apply(JavaSQLContext.scala:188) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108) at org.apache.spark.sql.api.java.JavaSQLContext.getSchema(JavaSQLContext.scala:188) at org.apache.spark.sql.api.java.JavaSQLContext.applySchema(JavaSQLContext.scala:90) at sample.spark.test.SparkJob.main(SparkJob.java:33) ... 5 more Please help me. Regards, Naveen.
save as file
Hi, I am spark 1.1.0. I need a help regarding saving rdd in a JSON file? How to do that? And how to mentions hdfs path in the program. -Naveen
RE: Parallelize on spark context
Hi, In the documentation is I found something like this. spark.default.parallelism · Local mode: number of cores on the local machine · Mesos fine grained mode: 8 · Others: total number of cores on all executor nodes or 2, whichever is larger I am using 2 node cluster with 48 cores(24+24). As per above no of data sets should be 1000/48=20.83, can be around 20 or 21. But it is dividing into 2 sets of each 500 size. I have used the function sc.parallelize(data, 10). But 10 datasets of size 100. 8 datasets executing on one node and 2 datasets on another node. How to check how many cores are running to complete task of 8 datasets?(Is there any commands or UI to check that) Regards, Naveen. From: holden.ka...@gmail.com [mailto:holden.ka...@gmail.com] On Behalf Of Holden Karau Sent: Friday, November 07, 2014 12:46 PM To: Naveen Kumar Pokala Cc: user@spark.apache.org Subject: Re: Parallelize on spark context Hi Naveen, So by default when we call parallelize it will be parallelized by the default number (which we can control with the property spark.default.parallelism) or if we just want a specific instance of parallelize to have a different number of partitions, we can instead call sc.parallelize(data, numpartitions). The default value of this is documented in http://spark.apache.org/docs/latest/configuration.html#spark-properties Cheers, Holden :) On Thu, Nov 6, 2014 at 10:43 PM, Naveen Kumar Pokala mailto:npok...@spcapitaliq.com>> wrote: Hi, JavaRDD distData = sc.parallelize(data); On what basis parallelize splits the data into multiple datasets. How to handle if we want these many datasets to be executed per executor? For example, my data is of 1000 integers list and I am having 2 node yarn cluster. It is diving into 2 batches of 500 size. Regards, Naveen. -- Cell : 425-233-8271
Parallelize on spark context
Hi, JavaRDD distData = sc.parallelize(data); On what basis parallelize splits the data into multiple datasets. How to handle if we want these many datasets to be executed per executor? For example, my data is of 1000 integers list and I am having 2 node yarn cluster. It is diving into 2 batches of 500 size. Regards, Naveen.
Nesting RDD
Hi, I am trying to execute a sample program by nesting the RDD inside the transformations. It is throwing null pointer exception. Any solution or alternative would be helpful. Thanks & regards, Naveen.
Number cores split up
Hi, I have a 2 node yarn cluster and I am using spark 1.1.0 to submit my tasks. As per the documentation of spark, number of cores are maximum cores available. So does it mean each node creates no of cores = no of threads to process the job assigned to that node. For ex, List data = new ArrayList(); for(int i=0;i<1000;i++) data.add(i); JavaRDD distData = sc.parallelize(data); distData=distData.map( new Function() { @Override public Integer call(Integer arg0) throws Exception { return arg0*arg0; } } ); distData.count(); The above program dividing my RDD into 2 batches of 500 size, and submitting to the executors. 1) So each executor will use all the cores of the node CPU to process 500 size batch am I right? 2) If so, Does it mean each executor uses multi threading? Is that execution parallel or sequential on node. 3) How to check how many cores an executor is using to process my jobs? 4) Do we have any chance to control the batch division on nodes? Please give some clarity on above. Thanks & Regards, Naveen
Spark Debugging
Hi, I have installed 2 node hadoop cluster (For example, on Unix machines A and B. A master node and data node, B is data node) I am submitting my driver programs through SPARK 1.1.0 with bin/spark-submit from Putty Client from my Windows machine. I want to debug my program from Eclipse on my local machine. I am not able to find a way to debug. Please let me know the ways to debug my driver program as well as executor programs [cid:image001.jpg@01CFF439.BBA1F3A0] Naveen kumar pokala +91 8801169530
Spark Debugging
Hi, I have installed 2 node hadoop cluster (For example, on Unix machines A and B. A master node and data node, B is data node) I am submitting my driver programs through SPARK 1.1.0 with bin/spark-submit from Putty Client from my Windows machine. I want to debug my program from Eclipse on my local machine. I am not able to find a way to debug. Please let me know the ways to debug my driver program as well as executor programs Regards, Naveen.