Re: using pyspark with standalone cluster
If you want to submit applications to a remote cluster where your port 7077 is opened publically, then you would need to set the *spark.driver.host *(with the public ip of your laptop) and *spark.driver.port* (optional, if there's no firewall between your laptop and the remote cluster). Keeping your 7077 open for public is a bad idea, you can read more here https://www.sigmoid.com/securing-apache-spark-cluster/ Thanks Best Regards On Mon, Jun 1, 2015 at 11:48 PM, AlexG swift...@gmail.com wrote: I've followed the instructions for setting up a standalone spark cluster (on EC2): - install spark on all the machines - enabled passwordless ssh - setup the conf/slaves file - start the master and slaves with the provided scripts The status on the 8080 port of the master tells me that the master and executors are all running. I can successfully use pyspark from the master. However, if I try to call pyspark remotely from my laptop, with MASTER=spark://ip:7077 pyspark, I get these errors: 15/06/01 10:02:14 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@IP:7077/user/Master... 15/06/01 10:02:34 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@IP:7077/user/Master... 15/06/01 10:02:54 INFO AppClient$ClientActor: Connecting to master akka.tcp://sparkMaster@IP:7077/user/Master... 15/06/01 10:03:14 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 15/06/01 10:03:14 ERROR TaskSchedulerImpl: Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up. Any idea what's going on here? I set port 7077 to be publicly accessible... -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/using-pyspark-with-standalone-cluster-tp23099.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Insert overwrite to hive - ArrayIndexOutOfBoundsException
Hi, I am using spark 1.3.1. I tried to insert (a new partition) into an existing partitioned hive table, but got ArrayIndexOutOfBoundsException. Below is a code snippet and the debug log. Any suggestions please. + case class Record4Dim(key: String, date: Int, hh: Int, x: Int, y: Int, z: Int, height: Float, u: Float , v: Float, w: Float, ph: Float, phb: Float, t: Float, p: Float, pb: Float, qvapor: Float, qgraup: Float, qnice: Float, qnrain: Float, tke_pbl: Float, el_pbl: Float) def flatKeyFromWrf(x: (String, (Map[String,Float], Float))): Record4Dim = { } val varWHeightFlatRDD = varWHeightRDD.map(FlatMapUtilClass().flatKeyFromWrf).toDF() varWHeightFlatRDD.registerTempTable(table_4Dim) for (zz - 1 to 51) hiveContext.sql(INSERT OVERWRITE table 4dim partition (zone= + ZONE + ,z= + zz + ,year= + YEAR + ,month= + MONTH + ) + select date, hh, x, y, height, u, v, w, ph, phb, t, pb, pb, qvapor, qgraup, qnice, tke_pbl, el_pbl from table_4Dim where z= + zz); + 15/06/01 21:07:20 DEBUG YarnHistoryService: Enqueue [1433192840040]: SparkListenerTaskEnd(4,0,ResultTask,ExceptionFailure(java.lang.ArrayIndexOutOfBoundsException,18,[Ljava.lang.StackTraceElement;@5783ce22,java.lang.ArrayIndexOutOfBoundsException: 18 at org.apache.spark.sql.catalyst.expressions.GenericRow.isNullAt(rows.scala:79) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:103) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1$1.apply(InsertIntoHiveTable.scala:100) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:100) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:82) at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:82) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Best, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Union data type
Hi, As per the doc https://github.com/databricks/spark-avro/blob/master/README.md, Union type doesn’t support all kind of combination. Is there any plan to support union type having string long in near future? Thanks Shagun Agarwal
spark sql - reading data from sql tables having space in column names
Hi, We are using spark sql (1.3.1) to load data from Microsoft sql server using jdbc (as described in https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases ). It is working fine except when there is a space in column names (we can't modify the schemas to remove space as it is a legacy database). Sqoop is able to handle such scenarios by enclosing column names in '[ ]' - the recommended method from microsoft sql server. ( https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java - line no 319) Is there a way to handle this in spark sql? Thanks, sachin
Re: flatMap output on disk / flatMap memory overhead
You could try rdd.persist(MEMORY_AND_DISK/DISK_ONLY).flatMap(...), I think StorageLevel MEMORY_AND_DISK means spark will try to keep the data in memory and if there isn't sufficient space then it will be shipped to the disk. Thanks Best Regards On Mon, Jun 1, 2015 at 11:02 PM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: Hi, Is there any way to force the output RDD of a flatMap op to be stored in both memory and disk as it is computed ? My RAM would not be able to fit the entire output of flatMap, so it really needs to starts using disk after the RAM gets full. I didn't find any way to force this. Also, what is the memory overhead of flatMap ? From my computations, the output RDD should fit in memory, but I get the following error after a while (and I know it's because of memory issues, since running the program with 1/3 of the input data finishes succesfully) 15/06/01 19:02:49 ERROR BlockFetcherIterator$BasicBlockFetcherIterator: Could not get block(s) from ConnectionManagerId(dco-node036-mgt.dco.ethz.ch,57478) java.io.IOException: sendMessageReliably failed because ack was not received within 60 sec at org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:866) at org.apache.spark.network.ConnectionManager$$anon$10$$anonfun$run$15.apply(ConnectionManager.scala:865) at scala.Option.foreach(Option.scala:236) at org.apache.spark.network.ConnectionManager$$anon$10.run(ConnectionManager.scala:865) at io.netty.util.HashedWheelTimer$HashedWheelTimeout.expire(HashedWheelTimer.java:581) at io.netty.util.HashedWheelTimer$HashedWheelBucket.expireTimeouts(HashedWheelTimer.java:656) at io.netty.util.HashedWheelTimer$Worker.run(HashedWheelTimer.java:367) at java.lang.Thread.run(Thread.java:745) Also, I've seen also this: https://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence but my understanding is that one should apply something like: rdd.flatMap(...).persist(MEMORY_AND_DISK) which assumes that the entire output of flatMap is first stored in memory (which is not possible in my case) and, only when it's done, is stored on the disk. Please correct me if I'm wrong. Anways, I've tried using this , but I got the same error. My config: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 125g) conf.set(spark.shuffle.file.buffer.kb, 1000) conf.set(spark.shuffle.consolidateFiles, true) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: HDFS Rest Service not available
It says your namenode is down (connection refused on 8020), you can restart your HDFS by going into hadoop directory and typing sbin/stop-dfs.sh and then sbin/start-dfs.sh Thanks Best Regards On Tue, Jun 2, 2015 at 5:03 AM, Su She suhsheka...@gmail.com wrote: Hello All, A bit scared I did something stupid...I killed a few PIDs that were listening to ports 2183 (kafka), 4042 (spark app), some of the PIDs didn't even seem to be stopped as they still are running when i do lsof -i:[port number] I'm not sure if the problem started after or before I did these kill commands, but I now can't connect to HDFS or start spark. I can't seem to access Hue. I am afraid I accidentally killed an important process related to HDFS. But, I am not sure what it would be as I couldn't even kill the PIDs. Is it a coincidence that HDFS failed randomly? Likely that I killed an important PID? How can I maybe restart HDFS? Thanks a lot! Error on Hue: Cannot access: /user/ec2-user. The HDFS REST service is not available. Note: You are a Hue admin but not a HDFS superuser (which is hdfs). HTTPConnectionPool(host='ec2-ip-address.us-west-1.compute.amazonaws.com', port=50070): Max retries exceeded with url: /webhdfs/v1/user/ec2-user?op=GETFILESTATUSuser.name=huedoas=ec2-user (Caused by class 'socket.error': [Errno 111] Connection refused) Error when I try to open spark-shell or a spark app: java.net.ConnectException: Call From ip-10-0-2-216.us-west-1.compute.internal/10.0.2.216 to ip-10-0-2-216.us-west-1.compute.internal:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783) at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730) at org.apache.hadoop.ipc.Client.call(Client.java:1415) at org.apache.hadoop.ipc.Client.call(Client.java:1364) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1921) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1089) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1085) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1085) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:852) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1125) at
Re: Spark stages very slow to complete
Hi, the code is some hundreds lines of Python. I can try to compose a minimal example as soon as I find the time, though. Any ideas until then? Would you mind posting the code? On 2 Jun 2015 00:53, Karlson ksonsp...@siberie.de wrote: Hi, In all (pyspark) Spark jobs, that become somewhat more involved, I am experiencing the issue that some stages take a very long time to complete and sometimes don't at all. This clearly correlates with the size of my input data. Looking at the stage details for one such stage, I am wondering where Spark spends all this time. Take this table of the stages task metrics for example: Metric Min 25th percentile Median 75th percentile Max Duration1.4 min 1.5 min 1.7 min 1.9 min 2.3 min Scheduler Delay 1 ms3 ms4 ms 5 ms23 ms Task Deserialization Time 1 ms2 ms3 ms 8 ms22 ms GC Time 0 ms0 ms0 ms 0 ms0 ms Result Serialization Time 0 ms0 ms0 ms 0 ms1 ms Getting Result Time 0 ms0 ms0 ms 0 ms0 ms Input Size / Records23.9 KB / 1 24.0 KB / 1 24.1 KB / 1 24.1 KB / 1 24.3 KB / 1 Why is the overall duration almost 2min? Where is all this time spent, when no progress of the stages is visible? The progress bar simply displays 0 succeeded tasks for a very long time before sometimes slowly progressing. Also, the name of the stage displayed above is `javaToPython at null:-1`, which I find very uninformative. I don't even know which action exactly is responsible for this stage. Does anyone experience similar issues or have any advice for me? Thanks! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: What is shuffle read and what is shuffle write ?
I found an interesting presentation http://www.slideshare.net/colorant/spark-shuffle-introduction and go through this thread also http://apache-spark-user-list.1001560.n3.nabble.com/How-does-shuffle-work-in-spark-td584.html Thanks Best Regards On Tue, Jun 2, 2015 at 3:06 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Is it input and ouput bytes/record size ? -- Deepak
Re: Spark 1.3.1 bundle does not build - unresolved dependency
You can try to skip the tests, try with: mvn -Dhadoop.version=2.4.0 -Pyarn *-DskipTests* clean package Thanks Best Regards On Tue, Jun 2, 2015 at 2:51 AM, Stephen Boesch java...@gmail.com wrote: I downloaded the 1.3.1 distro tarball $ll ../spark-1.3.1.tar.gz -rw-r-@ 1 steve staff 8500861 Apr 23 09:58 ../spark-1.3.1.tar.gz However the build on it is failing with an unresolved dependency: *configuration not public* $ build/sbt assembly -Dhadoop.version=2.5.2 -Pyarn -Phadoop-2.4 [error] (network-shuffle/*:update) sbt.ResolveException: *unresolved dependency: *org.apache.spark#spark-network-common_2.10;1.3.1: *configuration not public* in org.apache.spark#spark-network-common_2.10;1.3.1: 'test'. It was required from org.apache.spark#spark-network-shuffle_2.10;1.3.1 test Is there a known workaround for this? thanks
Re: flatMap output on disk / flatMap memory overhead
I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Streaming K-medoids
Erik, Thank you for your answer. It seems really good, but unfortunately I'm not very familiar with Scala, so I have partly understood. Could you please explain your idea with Spark implementation? Best regards, Marko On Mon 01 Jun 2015 06:35:17 PM CEST, Erik Erlandson wrote: I haven't given any thought to streaming it, but in case it's useful I do have a k-medoids implementation for Spark: http://silex.freevariable.com/latest/api/#com.redhat.et.silex.cluster.KMedoids Also a blog post about multi-threading it: http://erikerlandson.github.io/blog/2015/05/06/parallel-k-medoids-using-scala-parseq/ - Original Message - Hello everyone, I have an idea and I would like to get a validation from community about it. In Mahout there is an implementation of Streaming K-means. I'm interested in your opinion would it make sense to make a similar implementation of Streaming K-medoids? K-medoids has even bigger problems than K-means because it's not scalable, but can be useful in some cases (e.g. It allows more sophisticated distance measures). What is your opinion about such an approach? Does anyone see problems with it? Best regards, Marko - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.4.0-rc3: Actor not found
Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
What is shuffle read and what is shuffle write ?
Is it input and ouput bytes/record size ? -- Deepak
Re: Embedding your own transformer in Spark.ml Pipleline
Hi Dimple, take a look to existing transformers: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala (*it's for spark-1.4) The idea is just to implement class that extends Transformer withHasInputColwithHasOutputCol (if your transformer 1:1 column transformer) and has deftransform(dataset: DataFrame):DataFrame method. Thanks, Peter On 2015-06-02 20:19, dimple wrote: Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: flatMap output on disk / flatMap memory overhead
Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: updateStateByKey and kafka direct approach without shuffle
If you're using the spark partition id directly as the key, then you don't need to access offset ranges at all, right? You can create a single instance of a partitioner in advance, and all it needs to know is the number of partitions (which is just the count of all the kafka topic/partitions). On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav krot.vyaches...@gmail.com wrote: Cody, Thanks, good point. I fixed getting partition id to: class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner { override def numPartitions: Int = offsetRanges.size override def getPartition(key: Any): Int = { // this is set in .map(m = (TaskContext.get().partitionId(), m.value)) key.asInstanceOf[Int] } } inputStream .map(m = (TaskContext.get().partitionId(), m.value)) .transform { rdd = val part = new MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) new ProxyRDDWithPartitioner(rdd, part) } ... But how can I create same partitioner during updateStateByKey call? I have no idea how to access rdd when calling updateStateByKey. вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org: I think the general idea is worth pursuing. However, this line: override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } is using the kafka partition id, not the spark partition index, so it's going to give you fewer partitions / incorrect index Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The index into the offset range array matches the (spark) partition id. That will also tell you what the value of numPartitions should be. On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav krot.vyaches...@gmail.com wrote: Hi all, In my streaming job I'm using kafka streaming direct approach and want to maintain state with updateStateByKey. My PairRDD has message's topic name + partition id as a key. So, I assume that updateByState could work within same partition as KafkaRDD and not lead to shuffles. Actually this is not true, because updateStateByKey leads to cogroup transformation that thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not have partitioner at all. So, dependency is considered to be wide and leads to shuffle. I tried to avoid shuffling by providing custom partitioner to updateStateByKey, but KafkaRDD need to use same partitioner. For this I created a proxy RDD that just returns my partitioner. class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner) extends RDD[T](prev) { override val partitioner = Some(part) override def compute(split: Partition, context: TaskContext): Iterator[T] = prev.compute(split, context) override protected def getPartitions: Array[Partition] = prev.partitions override def getPreferredLocations(thePart: Partition): Seq[String] = prev.preferredLocations(thePart) } I use it as: val partitioner = new Partitioner { // TODO this should be retrieved from kafka override def numPartitions: Int = 2 override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } } inputStream .map(m = ((m.topic, m.partition), m.value)) .transform(new ProxyRDDWithPartitioner(_, partitioner)) .updateStateByKey(func, partitioner) The question is - is it safe to do such trick?
Re: Spark 1.4.0-rc3: Actor not found
Does it happen every time you read a parquet source? On Tue, Jun 2, 2015 at 3:42 AM, Anders Arpteg arp...@spotify.com wrote: The log is from the log aggregation tool (hortonworks, yarn logs ...), so both executors and driver. I'll send a private mail to you with the full logs. Also, tried another job as you suggested, and it actually worked fine. The first job was reading from a parquet source, and the second from an avro source. Could there be some issues with the parquet reader? Thanks, Anders On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote: How about other jobs? Is it an executor log, or a driver log? Could you post other logs near this error, please? Thank you. Best Regards, Shixiong Zhu 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com: Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Re: Can't build Spark 1.3
It did hang for me too. High RAM consumption during build. Had to free a lot of RAM and introduce swap memory just to get it build in my 3rd attempt. Everything else looks fine. You can download the prebuilt versions from the Spark homepage to save yourself from all this trouble. Thanks, Ritesh
Re: HDFS Rest Service not available
Ahh, this did the trick, I had to get the name node out of same mode however before it fully worked. Thanks! On Tue, Jun 2, 2015 at 12:09 AM, Akhil Das ak...@sigmoidanalytics.com wrote: It says your namenode is down (connection refused on 8020), you can restart your HDFS by going into hadoop directory and typing sbin/stop-dfs.sh and then sbin/start-dfs.sh Thanks Best Regards On Tue, Jun 2, 2015 at 5:03 AM, Su She suhsheka...@gmail.com wrote: Hello All, A bit scared I did something stupid...I killed a few PIDs that were listening to ports 2183 (kafka), 4042 (spark app), some of the PIDs didn't even seem to be stopped as they still are running when i do lsof -i:[port number] I'm not sure if the problem started after or before I did these kill commands, but I now can't connect to HDFS or start spark. I can't seem to access Hue. I am afraid I accidentally killed an important process related to HDFS. But, I am not sure what it would be as I couldn't even kill the PIDs. Is it a coincidence that HDFS failed randomly? Likely that I killed an important PID? How can I maybe restart HDFS? Thanks a lot! Error on Hue: Cannot access: /user/ec2-user. The HDFS REST service is not available. Note: You are a Hue admin but not a HDFS superuser (which is hdfs). HTTPConnectionPool(host='ec2-ip-address.us-west-1.compute.amazonaws.com', port=50070): Max retries exceeded with url: /webhdfs/v1/user/ec2-user?op=GETFILESTATUSuser.name=huedoas=ec2-user (Caused by class 'socket.error': [Errno 111] Connection refused) Error when I try to open spark-shell or a spark app: java.net.ConnectException: Call From ip-10-0-2-216.us-west-1.compute.internal/10.0.2.216 to ip-10-0-2-216.us-west-1.compute.internal:8020 failed on connection exception: java.net.ConnectException: Connection refused; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.net.NetUtils.wrapWithMessage(NetUtils.java:783) at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:730) at org.apache.hadoop.ipc.Client.call(Client.java:1415) at org.apache.hadoop.ipc.Client.call(Client.java:1364) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:206) at com.sun.proxy.$Proxy20.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getFileInfo(ClientNamenodeProtocolTranslatorPB.java:744) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy21.getFileInfo(Unknown Source) at org.apache.hadoop.hdfs.DFSClient.getFileInfo(DFSClient.java:1921) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1089) at org.apache.hadoop.hdfs.DistributedFileSystem$17.doCall(DistributedFileSystem.java:1085) at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1085) at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1400) at org.apache.spark.util.FileLogger.createLogDir(FileLogger.scala:123) at org.apache.spark.util.FileLogger.start(FileLogger.scala:115) at org.apache.spark.scheduler.EventLoggingListener.start(EventLoggingListener.scala:74) at org.apache.spark.SparkContext.init(SparkContext.scala:353) at org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:986) at $iwC$$iwC.init(console:9) at $iwC.init(console:18) at init(console:20) at .init(console:24) at .clinit(console) at .init(console:7) at .clinit(console) at $print(console) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606)
Re: data localisation in spark
So in spark is after acquiring executors from ClusterManeger, does tasks are scheduled on executors based on datalocality ?I Mean if in an application there are 2 jobs and output of 1 job is used as input of another job. And in job1 I did persist on some RDD, then while running job2 will it use the same executor where job1's output was persisted or it acquire executor again and data movement happens? And is it true no of execuotrs in an application are fixed and acquired at start of application and remains same throught application? If yes, how does it takes cares of explicit no of reducers in some of apis say rddd.reduceByKey(func,10); does at converting DAG to stages it calculates executors required and then acquire executors/worker nodes ? On Tue, Jun 2, 2015 at 11:06 PM, Sandy Ryza sandy.r...@cloudera.com wrote: It is not possible with JavaSparkContext either. The API mentioned below currently does not have any effect (we should document this). The primary difference between MR and Spark here is that MR runs each task in its own YARN container, while Spark runs multiple tasks within an executor, which needs to be requested before Spark knows what tasks it will run. Although dynamic allocation improves that last part. -Sandy On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com wrote: Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Can't build Spark 1.3
\ I downloaded the latest Spark (1.3.) from github. Then I tried to build it. First for scala 2.10 (and hadoop 2.4): build/mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -DskipTests clean package That resulted in hangup after printing bunch of line like [INFO] Dependency-reduced POM written at …… INFO] Dependency-reduced - Then I tried for scala 2.11 mvn -Pyarn -Phadoop-2.4 -Dscala-2.11 -DskipTests clean package That resulted in multiple compilation errors. What I actually want is: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package Is it only me, who can’t build Spark 1.3? And, is there any site to download Spark prebuilt for Hadoop 2.5 and Hive? Thank you for any help. Alexey This message, including any attachments, is the property of Sears Holdings Corporation and/or one of its subsidiaries. It is confidential and may contain proprietary or legally privileged information. If you are not the intended recipient, please delete it without reading the contents. Thank you.
DataFrames coming in SparkR in Apache Spark 1.4.0
For the impatient R-user, here is a link http://people.apache.org/~pwendell/spark-nightly/spark-1.4-docs/latest/sparkr.html to get started working with DataFrames using SparkR. Or copy and paste this link into your web browser: http://people.apache.org/~pwendell/spark-nightly/spark-1.4-docs/latest/sparkr.html Happy coding, Daniel - Daniel Emaasit, Ph.D. Research Assistant Transportation Research Center (TRC) University of Nevada, Las Vegas Las Vegas, NV 89154-4015 Cell: 615-649-2489 www.danielemaasit.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/DataFrames-coming-in-SparkR-in-Apache-Spark-1-4-0-tp23116.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: updateStateByKey and kafka direct approach without shuffle
Thanks, this works. Hopefully I didn't miss something important with this approach. вт, 2 июня 2015 г. в 20:15, Cody Koeninger c...@koeninger.org: If you're using the spark partition id directly as the key, then you don't need to access offset ranges at all, right? You can create a single instance of a partitioner in advance, and all it needs to know is the number of partitions (which is just the count of all the kafka topic/partitions). On Tue, Jun 2, 2015 at 12:40 PM, Krot Viacheslav krot.vyaches...@gmail.com wrote: Cody, Thanks, good point. I fixed getting partition id to: class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner { override def numPartitions: Int = offsetRanges.size override def getPartition(key: Any): Int = { // this is set in .map(m = (TaskContext.get().partitionId(), m.value)) key.asInstanceOf[Int] } } inputStream .map(m = (TaskContext.get().partitionId(), m.value)) .transform { rdd = val part = new MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) new ProxyRDDWithPartitioner(rdd, part) } ... But how can I create same partitioner during updateStateByKey call? I have no idea how to access rdd when calling updateStateByKey. вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org: I think the general idea is worth pursuing. However, this line: override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } is using the kafka partition id, not the spark partition index, so it's going to give you fewer partitions / incorrect index Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The index into the offset range array matches the (spark) partition id. That will also tell you what the value of numPartitions should be. On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav krot.vyaches...@gmail.com wrote: Hi all, In my streaming job I'm using kafka streaming direct approach and want to maintain state with updateStateByKey. My PairRDD has message's topic name + partition id as a key. So, I assume that updateByState could work within same partition as KafkaRDD and not lead to shuffles. Actually this is not true, because updateStateByKey leads to cogroup transformation that thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not have partitioner at all. So, dependency is considered to be wide and leads to shuffle. I tried to avoid shuffling by providing custom partitioner to updateStateByKey, but KafkaRDD need to use same partitioner. For this I created a proxy RDD that just returns my partitioner. class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner) extends RDD[T](prev) { override val partitioner = Some(part) override def compute(split: Partition, context: TaskContext): Iterator[T] = prev.compute(split, context) override protected def getPartitions: Array[Partition] = prev.partitions override def getPreferredLocations(thePart: Partition): Seq[String] = prev.preferredLocations(thePart) } I use it as: val partitioner = new Partitioner { // TODO this should be retrieved from kafka override def numPartitions: Int = 2 override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } } inputStream .map(m = ((m.topic, m.partition), m.value)) .transform(new ProxyRDDWithPartitioner(_, partitioner)) .updateStateByKey(func, partitioner) The question is - is it safe to do such trick?
Re: Embedding your own transformer in Spark.ml Pipleline
Thanks Peter. Can you share the Tokenizer.java class for Spark 1.2.1. Dimple On Tue, Jun 2, 2015 at 10:51 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi Dimple, take a look to existing transformers: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala (*it's for spark-1.4) The idea is just to implement class that extends Transformer with HasInputCol with HasOutputCol (if your transformer 1:1 column transformer) and has def transform(dataset: DataFrame): DataFrame method. Thanks, Peter On 2015-06-02 20:19, dimple wrote: Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark sql - reading data from sql tables having space in column names
I am having the same problem reading JSON. There does not seem to be a way of selecting a field that has a space, Executor Info from the Spark logs. I suggest that we open a JIRA ticket to address this issue. On Jun 2, 2015 10:08 AM, ayan guha guha.a...@gmail.com wrote: I would think the easiest way would be to create a view in DB with column names with no space. In fact, you can pass a sql in place of a real table. From documentation: The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. Kindly let the community know if this works On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal sachin.go...@jabong.com wrote: Hi, We are using spark sql (1.3.1) to load data from Microsoft sql server using jdbc (as described in https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases ). It is working fine except when there is a space in column names (we can't modify the schemas to remove space as it is a legacy database). Sqoop is able to handle such scenarios by enclosing column names in '[ ]' - the recommended method from microsoft sql server. ( https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java - line no 319) Is there a way to handle this in spark sql? Thanks, sachin -- Best Regards, Ayan Guha
[OFFTOPIC] Big Data Application Meetup
Hi everyone, I wanted to drop a note about a newly organized developer meetup in Bay Area: the Big Data Application Meetup (http://meetup.com/bigdataapps) and call for speakers. The plan is for meetup topics to be focused on application use-cases: how developers can build end-to-end solutions with open-source big data technologies. If you want to share your experience, please email me back to become a speaker. If you have any questions - I will be happy to answer them. We plan for the first event to be hosted by Cask at its HQ in Palo Alto in end of June. We also will be promoting meetup at Hadoop Summit and Spark Summit in the following weeks. Thank you, Alex Baranau
Re: IDE for sparkR
Rstudio is the best IDE for running sparkR. Instructions for this can be found at this link https://github.com/apache/spark/tree/branch-1.4/R . You will need to set some environment variables as described below. *Using SparkR from RStudio* If you wish to use SparkR from RStudio or other R frontends you will need to set some environment variables which point SparkR to your Spark installation. For example # Set this to where Spark is installed Sys.setenv(SPARK_HOME=/Users/shivaram/spark) # This line loads SparkR from the installed directory .libPaths(c(file.path(Sys.getenv(SPARK_HOME), R, lib), .libPaths())) library(SparkR) sc - sparkR.init(master=local) - Daniel Emaasit, Ph.D. Research Assistant Transportation Research Center (TRC) University of Nevada, Las Vegas Las Vegas, NV 89154-4015 Cell: 615-649-2489 www.danielemaasit.com -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/IDE-for-sparkR-tp4764p23115.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Embedding your own transformer in Spark.ml Pipleline
Thanks for the quick reply Ram. Will take a look at the Tokenizer code and try it out. Dimple On Tue, Jun 2, 2015 at 10:42 AM, Ram Sriharsha sriharsha@gmail.com wrote: Hi We are in the process of adding examples for feature transformations ( https://issues.apache.org/jira/browse/SPARK-7546) and this should be available shortly on Spark Master. In the meanwhile, the best place to start would be to look at how the Tokenizer works here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala You need to implement the Transformer interface as above. In this case a UnaryTransformer since the feature transformer acts on one column, transforms it and outputs another column. and an example of how to build a pipeline that includes a feature transformer (the HashingTF is the feature transformer analogous to what you would build): https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala but stay tuned, we should have examples in Python, Scala and Java soon Ram On Tue, Jun 2, 2015 at 10:19 AM, dimple dimp201...@gmail.com wrote: Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issues with Spark Streaming and Manual Clock used for Unit Tests
I have a situation where I have multiple tests that use Spark streaming with Manual clock. The first run is OK and processes the data when I increment the clock to the sliding window duration. The second test deviates and doesn't process any data. The traces follows which indicates memory store is called right after the receiver has finished loading the data for that set. The second test only called the memory store after the batch has started processing after the manual clock is incremented. The following is a trace that works. 15/06/02 10:39:18 INFO ManualLoadFileBasedReceiverDnsData: Took 78913544 nanos to load data 15/06/02 10:39:18 INFO MemoryStore: ensureFreeSpace(1624896) called with curMem=14071, maxMem=2061647216 15/06/02 10:39:18 INFO MemoryStore: Block input-0-1433266758000 stored as values in memory (estimated size 1586.8 KB, free 1964.6 MB) 15/06/02 10:39:18 INFO BlockManagerInfo: Added input-0-1433266758000 in memory on localhost:54349 (size: 1586.8 KB, free: 1964.6 MB) 15/06/02 10:39:18 INFO BlockManagerMaster: Updated info of block input-0-1433266758000 15/06/02 10:39:18 INFO BlockGenerator: Pushed block input-0-1433266758000 15/06/02 10:39:37 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 2000 15/06/02 10:39:37 INFO FlatMapValuedDStream: Time 2000 ms is invalid as zeroTime is 0 ms and slideDuration is 1 ms and difference is 2000 ms 15/06/02 10:39:37 INFO JobScheduler: No jobs added for time 2000 ms 15/06/02 10:39:37 INFO JobGenerator: Checkpointing graph for time 2000 ms 15/06/02 10:39:37 INFO DStreamGraph: Updating checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 4000 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 6000 15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 4000 ms is invalid as zeroTime is 0 ms and slideDuration is 1 ms and difference is 4000 ms 15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 4000 ms 15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 6000 ms is invalid as zeroTime is 0 ms and slideDuration is 1 ms and difference is 6000 ms 15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 6000 ms 15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 4000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 2000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000' 15/06/02 10:39:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 2000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-2000', took 18291958 bytes and 87 ms 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 8000 15/06/02 10:39:38 INFO JobGenerator: Checkpointing graph for time 6000 ms 15/06/02 10:39:38 INFO DStreamGraph: Updating checkpoint data for time 6000 ms 15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 4000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000' 15/06/02 10:39:38 INFO DStreamGraph: Updated checkpoint data for time 6000 ms 15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 4000 ms saved to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-4000', took 18291960 bytes and 28 ms 15/06/02 10:39:38 INFO ReceiveDataFromFileEndToEndDNSTTest: Clock time is 1 15/06/02 10:39:38 INFO CheckpointWriter: Saving checkpoint for time 6000 ms to file 'file:/Users/mobsniuk/perforce/IB/proj/analytics-dnst/platform/checkpoint/checkpoint-6000' 15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 2000 ms 15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/06/02 10:39:38 INFO FlatMapValuedDStream: Time 8000 ms is invalid as zeroTime is 0 ms and slideDuration is 1 ms and difference is 8000 ms 15/06/02 10:39:38 INFO JobScheduler: No jobs added for time 8000 ms 15/06/02 10:39:38 INFO DStreamGraph: Clearing checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO DStreamGraph: Cleared checkpoint data for time 4000 ms 15/06/02 10:39:38 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/06/02 10:39:38 INFO StateDStream: Time 0 ms is invalid as zeroTime is 0 ms and slideDuration is 1 ms and difference is 0 ms 15/06/02 10:39:38 INFO FlatMappedDStream: Slicing from 2000 ms to 1 ms (aligned to 2000 ms and 1 ms) 15/06/02 10:39:38 INFO CheckpointWriter: Checkpoint for time 6000 ms saved to file
Re: map - reduce only with disk
You shouldn't have to persist the RDD at all, just call flatMap and reduce on it directly. If you try to persist it, that will try to load the original dat into memory, but here you are only scanning through it once. Matei On Jun 2, 2015, at 2:09 AM, Octavian Ganea octavian.ga...@inf.ethz.ch wrote: Thanks, I was actually using reduceByKey, not groupByKey. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! 2015-06-02 4:40 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com: As long as you don't use cache(), these operations will go from disk to disk, and will only use a fixed amount of memory to build some intermediate results. However, note that because you're using groupByKey, that needs the values for each key to all fit in memory at once. In this case, if you're going to reduce right after, you should use reduceByKey, which will be more efficient. Matei On Jun 1, 2015, at 2:21 PM, octavian.ganea octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch wrote: Dear all, Does anyone know how can I force Spark to use only the disk when doing a simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Octavian Ganea Research assistant at ETH Zurich octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch http://da.inf.ethz.ch/people/OctavianGanea/ http://da.inf.ethz.ch/people/OctavianGanea/
Re: Compute Median in Spark Dataframe
Nice to hear from you Holden ! I ended up trying exactly that (Column) - but I may have done it wrong : In [*5*]: g.agg(Column(percentile(value, 0.5))) Py4JError: An error occurred while calling o97.agg. Trace: py4j.Py4JException: Method agg([class java.lang.String, class scala.collection.immutable.Nil$]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) Any idea ? Olivier. Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca a écrit : Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Can't build Spark 1.3
Have you run zinc during build ? See build/mvn which installs zinc. Cheers On Tue, Jun 2, 2015 at 12:26 PM, Ritesh Kumar Singh riteshoneinamill...@gmail.com wrote: It did hang for me too. High RAM consumption during build. Had to free a lot of RAM and introduce swap memory just to get it build in my 3rd attempt. Everything else looks fine. You can download the prebuilt versions from the Spark homepage to save yourself from all this trouble. Thanks, Ritesh
Re: Compute Median in Spark Dataframe
So for column you need to pass in a Java function, I have some sample code which does this but it does terrible things to access Spark internals. On Tuesday, June 2, 2015, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Nice to hear from you Holden ! I ended up trying exactly that (Column) - but I may have done it wrong : In [*5*]: g.agg(Column(percentile(value, 0.5))) Py4JError: An error occurred while calling o97.agg. Trace: py4j.Py4JException: Method agg([class java.lang.String, class scala.collection.immutable.Nil$]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:333) Any idea ? Olivier. Le mar. 2 juin 2015 à 18:02, Holden Karau hol...@pigscanfly.ca javascript:_e(%7B%7D,'cvml','hol...@pigscanfly.ca'); a écrit : Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com javascript:_e(%7B%7D,'cvml','o.girar...@lateral-thoughts.com'); wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com javascript:_e(%7B%7D,'cvml','yana.kadiy...@gmail.com'); a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com javascript:_e(%7B%7D,'cvml','o.girar...@lateral-thoughts.com'); wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Best strategy for Pandas - Spark
Thanks for the answer, I'm currently doing exactly that. I'll try to sum-up the usual Pandas = Spark Dataframe caveats soon. Regards, Olivier. Le mar. 2 juin 2015 à 02:38, Davies Liu dav...@databricks.com a écrit : The second one sounds reasonable, I think. On Thu, Apr 30, 2015 at 1:42 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Let's assume I have a complex workflow of more than 10 datasources as input - 20 computations (some creating intermediary datasets and some merging everything for the final computation) - some taking on average 1 minute to complete and some taking more than 30 minutes. What would be for you the best strategy to port this to Apache Spark ? Transform the whole flow into a Spark Job (PySpark or Scala) Transform only part of the flow (the heavy lifting ~30 min parts) using the same language (PySpark) Transform only part of the flow and pipe the rest from Scala to Python Regards, Olivier.
Re: Re: spark 1.3.1 jars in repo1.maven.org
Ryan - I sent a PR to fix your issue: https://github.com/apache/spark/pull/6599 Edward - I have no idea why the following error happened. ContextCleaner doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to support both hadoop 1 and hadoop 2. * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) Best Regards, Shixiong Zhu 2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com: I think this is causing issues upgrading ADAM https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383); attempting to build against Hadoop 1.0.4 yields errors like: 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) *java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected* at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-06-02 15:57:44 WARN TaskSetManager:71 - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop 2; Spark 1.3.1 expects the interface but is getting the class. It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I then need to hope that I don't exercise certain Spark code paths that run afoul of differences between Hadoop 1 and 2; does that seem correct? Thanks! On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote: I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi,
Re: Re: spark 1.3.1 jars in repo1.maven.org
Thanks so much Shixiong! This is great. On Tue, Jun 2, 2015 at 8:26 PM Shixiong Zhu zsxw...@gmail.com wrote: Ryan - I sent a PR to fix your issue: https://github.com/apache/spark/pull/6599 Edward - I have no idea why the following error happened. ContextCleaner doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to support both hadoop 1 and hadoop 2. * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) Best Regards, Shixiong Zhu 2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com: I think this is causing issues upgrading ADAM https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383); attempting to build against Hadoop 1.0.4 yields errors like: 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) *java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected* at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-06-02 15:57:44 WARN TaskSetManager:71 - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop 2; Spark 1.3.1 expects the interface but is getting the class. It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I then need to hope that I don't exercise certain Spark code paths that run afoul of differences between Hadoop 1 and 2; does that seem correct? Thanks! On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote: I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster.
Re: Spark 1.4 YARN Application Master fails with 500 connect refused
Just testing with Spark 1.3, it looks like it sets the proxy correctly to be the YARN RM host (0101) 15/06/03 10:34:19 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/06/03 10:34:20 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1432690361766_0596_01 15/06/03 10:34:20 INFO spark.SecurityManager: Changing view acls to: nw 15/06/03 10:34:20 INFO spark.SecurityManager: Changing modify acls to: nw 15/06/03 10:34:20 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nw); users with modify permissions: Set(nw) 15/06/03 10:34:20 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/06/03 10:34:21 INFO Remoting: Starting remoting 15/06/03 10:34:21 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkYarnAM@qtausc-pphd0137.hadoop.local:43972] 15/06/03 10:34:21 INFO util.Utils: Successfully started service 'sparkYarnAM' on port 43972. 15/06/03 10:34:21 INFO yarn.ApplicationMaster: Waiting for Spark driver to be reachable. 15/06/03 10:34:21 INFO yarn.ApplicationMaster: Driver now available: edge-node-77.skynet.hadoop:36387 15/06/03 10:34:21 INFO yarn.ApplicationMaster: Listen to driver: akka.tcp://sparkDriver@edge-node-77.skynet.hadoop:36387/user/YarnScheduler *15/06/03 10:34:21 INFO yarn.ApplicationMaster: Add WebUI Filter. AddWebUIFilter(org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter,Map(PROXY_HOSTS - qtausc-pphd0101.hadoop.local, PROXY_URI_BASES - http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0596 http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0596),/proxy/application_1432690361766_0596)* 15/06/03 10:34:21 INFO yarn.YarnRMClient: Registering the ApplicationMaster 15/06/03 10:34:21 INFO yarn.YarnAllocator: Will request 2 executor containers, each with 1 cores and 1408 MB memory including 384 MB overhead 15/06/03 10:34:21 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:1408, vCores:1, disks:0.0) 15/06/03 10:34:21 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:1408, vCores:1, disks:0.0) 15/06/03 10:34:21 INFO yarn.ApplicationMaster: Started progress reporter thread - sleep time : 5000 15/06/03 10:34:21 INFO impl.AMRMClientImpl: Received new token for : qtausc-pphd0151.hadoop.local:50941 15/06/03 10:34:21 INFO yarn.YarnAllocator: Launching container container_1432690361766_0596_01_02 for on host qtausc-pphd0151.hadoop.local 15/06/03 10:34:21 INFO yarn.YarnAllocator: Launching ExecutorRunnable. driverUrl: akka.tcp://sparkDriver@edge-node-77.skynet.hadoop:36387/user/CoarseGrainedScheduler, executorHostname: qtausc-pphd0151.hadoop.local 15/06/03 10:34:21 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. 15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Starting Executor Container 15/06/03 10:34:21 INFO impl.ContainerManagementProtocolProxy: yarn.client.max-nodemanagers-proxies : 500 15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Setting up ContainerLaunchContext 15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Preparing Local resources 15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Prepared Local resources Map(__spark__.jar - resource { scheme: maprfs port: -1 file: /user/nw/.sparkStaging/application_1432690361766_0596/spark-assembly-1.3.1-hadoop2.5.1-mapr-1501.jar } size: 130013450 timestamp: 1433291656330 type: FILE visibility: PRIVATE) 15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Setting up executor with environment: Map(CLASSPATH - {{PWD}}CPS{{PWD}}/__spark__.jarCPS$HADOOP_CONF_DIRCPS$HADOOP_COMMON_HOME/share/hadoop/common/*CPS$HADOOP_COMMON_HOME/share/hadoop/common/lib/*CPS$HADOOP_HDFS_HOME/share/hadoop/hdfs/*CPS$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*CPS$HADOOP_YARN_HOME/share/hadoop/yarn/*CPS$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*CPS/opt/mapr/lib/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/yarn/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/common/lib/*:/opt/mapr/hive/hive-current/lib/*, SPARK_LOG_URL_STDERR - http://qtausc-pphd0151.hadoop.local:8042/node/containerlogs/container_1432690361766_0596_01_02/nw/stderr?start=0, SPARK_DIST_CLASSPATH - /opt/mapr/lib/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/yarn/*:/opt/mapr/hadoop/hadoop-2.5.1/share/hadoop/common/lib/*:/opt/mapr/hive/hive-current/lib/*, SPARK_YARN_STAGING_DIR - .sparkStaging/application_1432690361766_0596, SPARK_YARN_CACHE_FILES_FILE_SIZES - 130013450, SPARK_USER - nw, SPARK_YARN_CACHE_FILES_VISIBILITIES - PRIVATE, SPARK_YARN_MODE - true, SPARK_YARN_CACHE_FILES_TIME_STAMPS - 1433291656330, SPARK_LOG_URL_STDOUT - http://qtausc-pphd0151.hadoop.local:8042/node/containerlogs/container_1432690361766_0596_01_02/nw/stdout?start=0, SPARK_YARN_CACHE_FILES - maprfs:/user/nw/.sparkStaging/application_1432690361766_0596/spark-assembly-1.3.1-hadoop2.5.1-mapr-1501.jar#__spark__.jar) 15/06/03 10:34:21 INFO yarn.ExecutorRunnable: Setting up executor with commands:
Re: Spark 1.4 YARN Application Master fails with 500 connect refused
That code hasn't changed at all between 1.3 and 1.4; it also has been working fine for me. Are you sure you're using exactly the same Hadoop libraries (since you're building with -Phadoop-provided) and Hadoop configuration in both cases? On Tue, Jun 2, 2015 at 5:29 PM, Night Wolf nightwolf...@gmail.com wrote: Hi all, Trying out Spark 1.4 on MapR Hadoop 2.5.1 running in yarn-client mode. Seems the application master doesn't work anymore, I get a 500 connect refused, even when I hit the IP/port of the spark UI directly. The logs don't show much. I build spark with Java 6, hive scala 2.10 and 2.11. I've tried with and without -Phadoop-provided *Build command;* ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests -e -X *Logs from spark shell;* 15/06/03 00:10:56 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/06/03 00:10:56 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/06/03 00:10:56 INFO ui.SparkUI: Started SparkUI at http://172.31.10.14:4040 15/06/03 00:10:57 INFO yarn.Client: Requesting a new application from cluster with 71 NodeManagers 15/06/03 00:10:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (112640 MB per container) 15/06/03 00:10:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/06/03 00:10:57 INFO yarn.Client: Setting up container launch context for our AM 15/06/03 00:10:57 INFO yarn.Client: Preparing resources for our AM container 15/06/03 00:10:57 INFO yarn.Client: Uploading resource file:///apps/spark/spark-1.4.0-SNAPSHOT-bin-mapr4.0.2_yarn_j6_2.11/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar - maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar 15/06/03 00:10:58 INFO yarn.Client: Uploading resource file:/tmp/spark-5e42f904-ff83-4c93-bd35-4c3e20226a8a/__hadoop_conf__983379693214711.zip - maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/__hadoop_conf__983379693214711.zip 15/06/03 00:10:58 INFO yarn.Client: Setting up the launch environment for our AM container 15/06/03 00:10:58 INFO spark.SecurityManager: Changing view acls to: nw 15/06/03 00:10:58 INFO spark.SecurityManager: Changing modify acls to: nw 15/06/03 00:10:58 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nw); users with modify permissions: Set(nw) 15/06/03 00:10:58 INFO yarn.Client: Submitting application 593 to ResourceManager 15/06/03 00:10:58 INFO security.ExternalTokenManagerFactory: Initialized external token manager class - com.mapr.hadoop.yarn.security.MapRTicketManager 15/06/03 00:10:58 INFO impl.YarnClientImpl: Submitted application application_1432690361766_0593 15/06/03 00:10:59 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:10:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1433290258143 final status: UNDEFINED tracking URL: http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0593/ user: nw 15/06/03 00:11:00 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:01 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:02 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:03 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:03 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as AkkaRpcEndpointRef(Actor[akka.tcp:// sparkYarnAM@192.168.81.167:36542/user/YarnAM#1631897818]) 15/06/03 00:11:03 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS - qtausc-pphd0167.hadoop.local, PROXY_URI_BASES - http://qtausc-pphd0167.hadoop.local:8088/proxy/application_1432690361766_0593), /proxy/application_1432690361766_0593 15/06/03 00:11:03 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/06/03 00:11:04 INFO yarn.Client: Application report for application_1432690361766_0593 (state: RUNNING) 15/06/03 00:11:04 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: 192.168.81.167 ApplicationMaster RPC port: 0 queue: default start time: 1433290258143 final status: UNDEFINED tracking URL: http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0593/ user: nw 15/06/03
Re: Re: spark 1.3.1 jars in repo1.maven.org
We are having a separate discussion about this but, I don't understand why this is a problem? You're supposed to build Spark for Hadoop 1 if you run it on Hadoop 1 and I am not sure that is happening here, given the error. I do not think this should change as I do not see that it fixes something. Let's please concentrate the follow up on the JIRA since you already made one. On Wed, Jun 3, 2015 at 2:26 AM, Shixiong Zhu zsxw...@gmail.com wrote: Ryan - I sent a PR to fix your issue: https://github.com/apache/spark/pull/6599 Edward - I have no idea why the following error happened. ContextCleaner doesn't use any Hadoop API. Could you try Spark 1.3.0? It's supposed to support both hadoop 1 and hadoop 2. * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) Best Regards, Shixiong Zhu 2015-06-03 0:08 GMT+08:00 Ryan Williams ryan.blake.willi...@gmail.com: I think this is causing issues upgrading ADAM https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383); attempting to build against Hadoop 1.0.4 yields errors like: 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) *java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected* at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-06-02 15:57:44 WARN TaskSetManager:71 - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop 2; Spark 1.3.1 expects the interface but is getting the class. It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I then need to hope that I don't exercise certain Spark code paths that run afoul of differences between Hadoop 1 and 2; does that seem correct? Thanks! On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote: I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under
Re: Can't build Spark
Spark 1.3.1, Scala 2.11.6, Maven 3.3.3, I'm behind proxy, have set my proxy settings in maven settings. Thanks, On Tue, Jun 2, 2015 at 2:54 PM, Ted Yu yuzhih...@gmail.com wrote: Can you give us some more information ? Such as: which Spark release you were building what command you used Scala version you used Thanks On Tue, Jun 2, 2015 at 2:50 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: building Spark is throwing errors, any ideas? [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: repo.maven.apache.org from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 'parent.relativePath' points at wrong local POM @ line 21, column 11 at org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195) at org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)
Re: Embedding your own transformer in Spark.ml Pipleline
I found this : https://spark.apache.org/docs/1.2.0/api/java/org/apache/spark/ml/feature/Tokenizer.html which indicates the Tokenizer did exist in Spark 1.2.0 then and not in 1.2.1? On Tue, Jun 2, 2015 at 12:45 PM, Peter Rudenko petro.rude...@gmail.com wrote: I'm afraid there's no such class for 1.2.1. This API was added to 1.3.0 AFAIK. On 2015-06-02 21:40, Dimp Bhat wrote: Thanks Peter. Can you share the Tokenizer.java class for Spark 1.2.1. Dimple On Tue, Jun 2, 2015 at 10:51 AM, Peter Rudenko petro.rude...@gmail.com wrote: Hi Dimple, take a look to existing transformers: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/OneHotEncoder.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala (*it's for spark-1.4) The idea is just to implement class that extends Transformer with HasInputCol with HasOutputCol (if your transformer 1:1 column transformer) and has def transform(dataset: DataFrame): DataFrame method. Thanks, Peter On 2015-06-02 20:19, dimple wrote: Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't build Spark
Can you give us some more information ? Such as: which Spark release you were building what command you used Scala version you used Thanks On Tue, Jun 2, 2015 at 2:50 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: building Spark is throwing errors, any ideas? [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: repo.maven.apache.org from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 'parent.relativePath' points at wrong local POM @ line 21, column 11 at org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195) at org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)
Scripting with groovy
Hi all, Has anyone tried to add Scripting capabilities to spark streaming using groovy? I would like to stop the streaming context, update a transformation function written in groovy( for example to manipulate json ), restart the streaming context and obtain a new behavior without re-submit the application. Is it possible? Do you think it makes sense or there is a smarter way to accomplish that? Thanks Paolo Inviata dal mio Windows Phone
Re: map - reduce only with disk
Yup, exactly. All the workers will use local disk in addition to RAM, but maybe one thing you need to configure is the directory to use for that. It should be set trough spark.local.dir. By default it's /tmp, which on some machines is also in RAM, so that could be a problem. You should set it to a folder on a real disk, or even better, a comma-separated list of disks (e.g. /mnt1,/mnt2) if you have multiple disks. Matei On Jun 2, 2015, at 1:03 PM, Octavian Ganea octavian.ga...@inf.ethz.ch wrote: Thanks a lot! So my understanding is that you call persist only if you need to use the rdd at least twice to compute different things. So, if I just need the RDD for a single scan , like in a simple flatMap(..).reduceByKey(..).saveAsTextFile(..) how do I force the slaves to use the hard-disk (in addition to the RAM) when the RAM is full and not to fail like they do now? Thank you! 2015-06-02 21:25 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com: You shouldn't have to persist the RDD at all, just call flatMap and reduce on it directly. If you try to persist it, that will try to load the original dat into memory, but here you are only scanning through it once. Matei On Jun 2, 2015, at 2:09 AM, Octavian Ganea octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch wrote: Thanks, I was actually using reduceByKey, not groupByKey. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! 2015-06-02 4:40 GMT+02:00 Matei Zaharia matei.zaha...@gmail.com mailto:matei.zaha...@gmail.com: As long as you don't use cache(), these operations will go from disk to disk, and will only use a fixed amount of memory to build some intermediate results. However, note that because you're using groupByKey, that needs the values for each key to all fit in memory at once. In this case, if you're going to reduce right after, you should use reduceByKey, which will be more efficient. Matei On Jun 1, 2015, at 2:21 PM, octavian.ganea octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch wrote: Dear all, Does anyone know how can I force Spark to use only the disk when doing a simple flatMap(..).groupByKey.reduce(_ + _) ? Thank you! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html http://apache-spark-user-list.1001560.n3.nabble.com/map-reduce-only-with-disk-tp23102.html Sent from the Apache Spark User List mailing list archive at Nabble.com http://nabble.com/. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org -- Octavian Ganea Research assistant at ETH Zurich octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch http://da.inf.ethz.ch/people/OctavianGanea/ http://da.inf.ethz.ch/people/OctavianGanea/ -- Octavian Ganea Research assistant at ETH Zurich octavian.ga...@inf.ethz.ch mailto:octavian.ga...@inf.ethz.ch http://da.inf.ethz.ch/people/OctavianGanea/ http://da.inf.ethz.ch/people/OctavianGanea/
Can't build Spark
building Spark is throwing errors, any ideas? [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: repo.maven.apache.org from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 'parent.relativePath' points at wrong local POM @ line 21, column 11 at org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195) at org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)
Re: How to monitor Spark Streaming from Kafka?
Nobody mentioned CM yet? Kafka is now supported by CM/CDH 5.4 http://www.cloudera.com/content/cloudera/en/documentation/cloudera-kafka/latest/PDF/cloudera-kafka.pdf -- Ruslan Dautkhanov On Mon, Jun 1, 2015 at 5:19 PM, Dmitry Goldenberg dgoldenberg...@gmail.com wrote: Thank you, Tathagata, Cody, Otis. - Dmitry On Mon, Jun 1, 2015 at 6:57 PM, Otis Gospodnetic otis.gospodne...@gmail.com wrote: I think you can use SPM - http://sematext.com/spm - it will give you all Spark and all Kafka metrics, including offsets broken down by topic, etc. out of the box. I see more and more people using it to monitor various components in data processing pipelines, a la http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/ Otis On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg dgoldenberg...@gmail.com wrote: Hi, What are some of the good/adopted approached to monitoring Spark Streaming from Kafka? I see that there are things like http://quantifind.github.io/KafkaOffsetMonitor, for example. Do they all assume that Receiver-based streaming is used? Then Note that one disadvantage of this approach (Receiverless Approach, #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based Kafka monitoring tools will not show progress. However, you can access the offsets processed by this approach in each batch and update Zookeeper yourself. The code sample, however, seems sparse. What do you need to do here? - directKafkaStream.foreachRDD( new FunctionJavaPairRDDlt;String, String, Void() { @Override public Void call(JavaPairRDDString, Integer rdd) throws IOException { OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges // offsetRanges.length = # of Kafka partitions being consumed ... return null; } } ); and if these are updated, will KafkaOffsetMonitor work? Monitoring seems to center around the notion of a consumer group. But in the receiverless approach, code on the Spark consumer side doesn't seem to expose a consumer group parameter. Where does it go? Can I/should I just pass in group.id as part of the kafkaParams HashMap? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Can't build Spark
I ran dev/change-version-to-2.11.sh first. I used the following command but didn't reproduce the error below: mvn -DskipTests -Phadoop-2.4 -Pyarn -Phive clean package My env: maven 3.3.1 Possibly the error was related to proxy setting. FYI On Tue, Jun 2, 2015 at 3:14 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: Spark 1.3.1, Scala 2.11.6, Maven 3.3.3, I'm behind proxy, have set my proxy settings in maven settings. Thanks, On Tue, Jun 2, 2015 at 2:54 PM, Ted Yu yuzhih...@gmail.com wrote: Can you give us some more information ? Such as: which Spark release you were building what command you used Scala version you used Thanks On Tue, Jun 2, 2015 at 2:50 PM, Mulugeta Mammo mulugeta.abe...@gmail.com wrote: building Spark is throwing errors, any ideas? [FATAL] Non-resolvable parent POM: Could not transfer artifact org.apache:apache:pom:14 from/to central ( http://repo.maven.apache.org/maven2): Error transferring file: repo.maven.apache.org from http://repo.maven.apache.org/maven2/org/apache/apache/14/apache-14.pom and 'parent.relativePath' points at wrong local POM @ line 21, column 11 at org.apache.maven.model.building.DefaultModelProblemCollector.newModelBuildingException(DefaultModelProblemCollector.java:195) at org.apache.maven.model.building.DefaultModelBuilder.readParentExternally(DefaultModelBuilder.java:841)
Behavior of the spark.streaming.kafka.maxRatePerPartition config param?
Hi, Could someone explain the behavior of the spark.streaming.kafka.maxRatePerPartition parameter? The doc says An important (configuration) is spark.streaming.kafka.maxRatePerPartition which is the maximum rate at which each Kafka partition will be read by (the) direct API. What is the default behavior for this parameter? From some testing it appears that with it not being set, the RDD size tends to be quite low. With it set, we're seeing the consumer picking up items off the topic quite more actively, e.g. -Dspark.streaming.kafka.maxRatePerPartition=1000 in --driver-java-options. Does this parameter set the RDD size to a very low value? seems to be defaulting to 0... but what's the effect of that? protected val maxMessagesPerPartition: Option[Long] = { val ratePerSec = context.sparkContext.getConf.getInt( spark.streaming.kafka.maxRatePerPartition, 0) if (ratePerSec 0) { val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000 Some((secsPerBatch * ratePerSec).toLong) } else { None } } // limits the maximum number of messages per partition protected def clamp( leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = { maxMessagesPerPartition.map { mmp = leaderOffsets.map { case (tp, lo) = tp - lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset)) } }.getOrElse(leaderOffsets) } what would we limit by default? And once Spark Streaming does pick up messages, would it be at the maximum value? does it ever fall below max even if there are max or more than max in the topic? Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Behavior-of-the-spark-streaming-kafka-maxRatePerPartition-config-param-tp23117.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Thanks for you information. I'll give spark1.4 a try when it's released. On Wed, Jun 3, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count
Application is always in process when I check out logs of completed application
I run spark application in spark standalone cluster with client deploy mode. I want to check out the logs of my finished application, but I always get a page telling me Application history not found - Application xxx is still in process. I am pretty sure that the application has indeed completed because I can see it in the Completed Applications list show by Spark WebUI, and I have also found the log file with suffix .inprocessin the directory set by spark.eventLog.dir in my spark-default.conf Oh, BTW, I am using spark 1.3.0 So, is there anything I missed? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Application-is-always-in-process-when-I-check-out-logs-of-completed-application-tp23123.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Could you try it out with Spark 1.4 RC3? Also pinging, Cloudera folks, they may be aware of something. BTW, the way I have debugged memory leaks in the past is as follows. Run with a small driver memory, say 1 GB. Periodically (maybe a script), take snapshots of histogram and also do memory dumps. Say every hour. And then compare the difference between two histo/dumps that are few hours separated (more the better). Diffing histo is easy. Diff two dumps can be done in JVisualVM, it will show the diff in the objects that got added in the later dump. That makes it easy to debug what is not getting cleaned. TD On Tue, Jun 2, 2015 at 7:33 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your
Re: Spark Streming yarn-cluster Mode Off-heap Memory Is Constantly Growing
Hi, Thanks for you reply. Here's the top 30 entries of jmap -histo:live result: num #instances #bytes class name -- 1: 40802 145083848 [B 2: 99264 12716112 methodKlass 3: 99264 12291480 constMethodKlass 4: 84729144816 constantPoolKlass 5: 84727625192 instanceKlassKlass 6: 1866097824 [Lscala.concurrent.forkjoin.ForkJoinTask; 7: 70454804832 constantPoolCacheKlass 8:1391684453376 java.util.HashMap$Entry 9: 94273542512 methodDataKlass 10:1413123391488 io.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry 11:1354913251784 java.lang.Long 12: 261922765496 [C 13: 8131140560 [Ljava.util.HashMap$Entry; 14: 89971061936 java.lang.Class 15: 16022 851384 [[I 16: 16447 789456 java.util.zip.Inflater 17: 13855 723376 [S 18: 17282 691280 java.lang.ref.Finalizer 19: 25725 617400 java.lang.String 20: 320 570368 [Lio.netty.buffer.PoolThreadCache$MemoryRegionCache$Entry; 21: 16066 514112 java.util.concurrent.ConcurrentHashMap$HashEntry 22: 12288 491520 org.jboss.netty.util.internal.ConcurrentIdentityHashMap$Segment 23: 13343 426976 java.util.concurrent.locks.ReentrantLock$NonfairSync 24: 12288 396416 [Lorg.jboss.netty.util.internal.ConcurrentIdentityHashMap$HashEntry; 25: 16447 394728 java.util.zip.ZStreamRef 26: 565 370080 [I 27: 508 272288 objArrayKlassKlass 28: 16233 259728 java.lang.Object 29: 771 209232 [Ljava.util.concurrent.ConcurrentHashMap$HashEntry; 30: 2524 192312 [Ljava.lang.Object; But as I mentioned above, the heap memory seems OK, the extra memory is consumed by some off-heap data. I can't find a way to figure out what is in there. Besides, I did some extra experiments, i.e. run the same program in difference environments to test whether it has off-heap memory issue: spark1.0 + standalone = no spark1.0 + yarn = no spark1.3 + standalone = no spark1.3 + yarn = yes I'm using CDH5.1, so the spark1.0 is provided by cdh, and spark-1.3.1-bin-hadoop2.3 is downloaded from the official website. I could use spark1.0 + yarn, but I can't find a way to handle the logs, level and rolling, so it'll explode the harddrive. Currently I'll stick to spark1.0 + standalone, until our ops team decides to upgrade cdh. On Tue, Jun 2, 2015 at 2:58 PM, Tathagata Das t...@databricks.com wrote: While you are running is it possible for you login into the YARN node and get histograms of live objects using jmap -histo:live. That may reveal something. On Thursday, May 28, 2015, Ji ZHANG zhangj...@gmail.com wrote: Hi, Unfortunately, they're still growing, both driver and executors. I run the same job with local mode, everything is fine. On Thu, May 28, 2015 at 5:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Can you replace your counting part with this? logs.filter(_.s_id 0).foreachRDD(rdd = logger.info(rdd.count())) Thanks Best Regards On Thu, May 28, 2015 at 1:02 PM, Ji ZHANG zhangj...@gmail.com wrote: Hi, I wrote a simple test job, it only does very basic operations. for example: val lines = KafkaUtils.createStream(ssc, zkQuorum, group, Map(topic - 1)).map(_._2) val logs = lines.flatMap { line = try { Some(parse(line).extract[Impression]) } catch { case _: Exception = None } } logs.filter(_.s_id 0).count.foreachRDD { rdd = rdd.foreachPartition { iter = iter.foreach(count = logger.info(count.toString)) } } It receives messages from Kafka, parse the json, filter and count the records, and then print it to logs. Thanks. On Thu, May 28, 2015 at 3:07 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Hi Zhang, Could you paste your code in a gist? Not sure what you are doing inside the code to fill up memory. Thanks Best Regards On Thu, May 28, 2015 at 10:08 AM, Ji ZHANG zhangj...@gmail.com wrote: Hi, Yes, I'm using createStream, but the storageLevel param is by default MEMORY_AND_DISK_SER_2. Besides, the driver's memory is also growing. I don't think Kafka messages will be cached in driver. On Thu, May 28, 2015 at 12:24 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Are you using the createStream or createDirectStream api? If its the former, you can try setting the StorageLevel to MEMORY_AND_DISK (it might slow things down though). Another way would be to try the later one. Thanks Best Regards On Wed, May 27, 2015 at 1:00 PM, Ji ZHANG
Re: in GraphX,program with Pregel runs slower and slower after several iterations
I've been encountering something similar too. I suspected that was related to the lineage growth of the graph/RDDs. So I checkpoint the graph every 60 Pregel rounds, after doing which my program doesn't slow down any more (except that every checkpoint takes some extra time). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/in-GraphX-program-with-Pregel-runs-slower-and-slower-after-several-iterations-tp23121p23122.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark 1.4 YARN Application Master fails with 500 connect refused
Thanks Marcelo - looks like it was my fault. Seems when we deployed the new version of spark it was picking up the wrong yarn site and setting the wrong proxy host. All good now! On Wed, Jun 3, 2015 at 11:01 AM, Marcelo Vanzin van...@cloudera.com wrote: That code hasn't changed at all between 1.3 and 1.4; it also has been working fine for me. Are you sure you're using exactly the same Hadoop libraries (since you're building with -Phadoop-provided) and Hadoop configuration in both cases? On Tue, Jun 2, 2015 at 5:29 PM, Night Wolf nightwolf...@gmail.com wrote: Hi all, Trying out Spark 1.4 on MapR Hadoop 2.5.1 running in yarn-client mode. Seems the application master doesn't work anymore, I get a 500 connect refused, even when I hit the IP/port of the spark UI directly. The logs don't show much. I build spark with Java 6, hive scala 2.10 and 2.11. I've tried with and without -Phadoop-provided *Build command;* ./make-distribution.sh --name mapr4.0.2_yarn_j6_2.10 --tgz -Pyarn -Pmapr4 -Phadoop-2.4 -Pmapr4 -Phive -Phadoop-provided -Dhadoop.version=2.5.1-mapr-1501 -Dyarn.version=2.5.1-mapr-1501 -DskipTests -e -X *Logs from spark shell;* 15/06/03 00:10:56 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040 15/06/03 00:10:56 INFO util.Utils: Successfully started service 'SparkUI' on port 4040. 15/06/03 00:10:56 INFO ui.SparkUI: Started SparkUI at http://172.31.10.14:4040 15/06/03 00:10:57 INFO yarn.Client: Requesting a new application from cluster with 71 NodeManagers 15/06/03 00:10:57 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (112640 MB per container) 15/06/03 00:10:57 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/06/03 00:10:57 INFO yarn.Client: Setting up container launch context for our AM 15/06/03 00:10:57 INFO yarn.Client: Preparing resources for our AM container 15/06/03 00:10:57 INFO yarn.Client: Uploading resource file:///apps/spark/spark-1.4.0-SNAPSHOT-bin-mapr4.0.2_yarn_j6_2.11/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar - maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/spark-assembly-1.4.0-SNAPSHOT-hadoop2.5.1-mapr-1501.jar 15/06/03 00:10:58 INFO yarn.Client: Uploading resource file:/tmp/spark-5e42f904-ff83-4c93-bd35-4c3e20226a8a/__hadoop_conf__983379693214711.zip - maprfs:/user/nw/.sparkStaging/application_1432690361766_0593/__hadoop_conf__983379693214711.zip 15/06/03 00:10:58 INFO yarn.Client: Setting up the launch environment for our AM container 15/06/03 00:10:58 INFO spark.SecurityManager: Changing view acls to: nw 15/06/03 00:10:58 INFO spark.SecurityManager: Changing modify acls to: nw 15/06/03 00:10:58 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nw); users with modify permissions: Set(nw) 15/06/03 00:10:58 INFO yarn.Client: Submitting application 593 to ResourceManager 15/06/03 00:10:58 INFO security.ExternalTokenManagerFactory: Initialized external token manager class - com.mapr.hadoop.yarn.security.MapRTicketManager 15/06/03 00:10:58 INFO impl.YarnClientImpl: Submitted application application_1432690361766_0593 15/06/03 00:10:59 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:10:59 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: N/A ApplicationMaster RPC port: -1 queue: default start time: 1433290258143 final status: UNDEFINED tracking URL: http://qtausc-pphd0101.hadoop.local:8088/proxy/application_1432690361766_0593/ user: nw 15/06/03 00:11:00 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:01 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:02 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:03 INFO yarn.Client: Application report for application_1432690361766_0593 (state: ACCEPTED) 15/06/03 00:11:03 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as AkkaRpcEndpointRef(Actor[akka.tcp:// sparkYarnAM@192.168.81.167:36542/user/YarnAM#1631897818]) 15/06/03 00:11:03 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS - qtausc-pphd0167.hadoop.local, PROXY_URI_BASES - http://qtausc-pphd0167.hadoop.local:8088/proxy/application_1432690361766_0593), /proxy/application_1432690361766_0593 15/06/03 00:11:03 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/06/03 00:11:04 INFO yarn.Client: Application report for application_1432690361766_0593 (state: RUNNING) 15/06/03 00:11:04 INFO yarn.Client: client token: N/A diagnostics: N/A
Filter operation to return two RDDs at once.
I want to do this val qtSessionsWithQt = rawQtSession.filter(_._2.qualifiedTreatmentId != NULL_VALUE) val guidUidMapSessions = rawQtSession.filter(_._2.qualifiedTreatmentId == NULL_VALUE) This will run two different stages can this be done in one stage ? val (qtSessionsWithQt, guidUidMapSessions) = rawQtSession.*magicFilter* (_._2.qualifiedTreatmentId != NULL_VALUE) -- Deepak
How to create fewer output files for Spark job ?
I am running a series of spark functions with 9000 executors and its resulting in 9000+ files that is execeeding the namespace file count qutota. How can Spark be configured to use CombinedOutputFormat. {code} protected def writeOutputRecords(detailRecords: RDD[(AvroKey[DetailOutputRecord], NullWritable)], outputDir: String) { val writeJob = new Job() val schema = SchemaUtil.outputSchema(_detail) AvroJob.setOutputKeySchema(writeJob, schema) detailRecords.saveAsNewAPIHadoopFile(outputDir, classOf[AvroKey[GenericRecord]], classOf[org.apache.hadoop.io.NullWritable], classOf[AvroKeyOutputFormat[GenericRecord]], writeJob.getConfiguration) } {code} -- Deepak
build jar with all dependencies
hello community, i have build a jar file from my spark app with maven (mvn clean compile assembly:single) and the following pom file: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdma-spark/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging namema-spark/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url /repository /repositories dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdmgm.tp.bigdata/groupId artifactIdma-commons/artifactId version0.0.1-SNAPSHOT/version /dependency /dependencies build plugins plugin artifactIdmaven-assembly-plugin/artifactId configuration archive manifest mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass /manifest /archive descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project if i run my app with java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the following error message: proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils WARN - Your hostname, proewer-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils WARN - Set SPARK_LOCAL_IP if you need to bind to another address 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager INFO - Changing view acls to: proewer 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager INFO - Changing modify acls to: proewer 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager INFO - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(proewer); users with modify permissions: Set(proewer) Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155) at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197) at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at mgm.tp.bigdata.ma_spark.SparkMain.main(SparkMain.java:38) what i do wrong? best regards, paul
Re: Spark 1.4.0-rc3: Actor not found
How about other jobs? Is it an executor log, or a driver log? Could you post other logs near this error, please? Thank you. Best Regards, Shixiong Zhu 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com: Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Shared / NFS filesystems
Hello! I have Spark running in standalone mode, and there are a bunch of worker nodes connected to the master. The workers have a shared file system, but the master node doesn't. Is this something that's not going to work? i.e., should the master node also be on the same shared filesystem mounted on the same path as all of the workers? Thanks! Pradyumna
Re: Shared / NFS filesystems
You can run/submit your code from one of the worker which has access to the file system and it should be fine i think. Give it a try. Thanks Best Regards On Tue, Jun 2, 2015 at 3:22 PM, Pradyumna Achar pradyumna.ac...@gmail.com wrote: Hello! I have Spark running in standalone mode, and there are a bunch of worker nodes connected to the master. The workers have a shared file system, but the master node doesn't. Is this something that's not going to work? i.e., should the master node also be on the same shared filesystem mounted on the same path as all of the workers? Thanks! Pradyumna
Re: Spark 1.4.0-rc3: Actor not found
The log is from the log aggregation tool (hortonworks, yarn logs ...), so both executors and driver. I'll send a private mail to you with the full logs. Also, tried another job as you suggested, and it actually worked fine. The first job was reading from a parquet source, and the second from an avro source. Could there be some issues with the parquet reader? Thanks, Anders On Tue, Jun 2, 2015 at 11:53 AM, Shixiong Zhu zsxw...@gmail.com wrote: How about other jobs? Is it an executor log, or a driver log? Could you post other logs near this error, please? Thank you. Best Regards, Shixiong Zhu 2015-06-02 17:11 GMT+08:00 Anders Arpteg arp...@spotify.com: Just compiled Spark 1.4.0-rc3 for Yarn 2.2 and tried running a job that worked fine for Spark 1.3. The job starts on the cluster (yarn-cluster mode), initial stage starts, but the job fails before any task succeeds with the following error. Any hints? [ERROR] [06/02/2015 09:05:36.962] [Executor task launch worker-0] [akka.tcp://sparkDriver@10.254.6.15:33986/user/CoarseGrainedScheduler] swallowing exception during message send (akka.remote.RemoteTransportExceptionNoStackTrace) Exception in thread main akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://sparkDriver@10.254.6.15:33986/), Path(/user/OutputCommitCoordinator)] at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:65) at akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:63) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74) at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110) at akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:267) at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.scala:508) at akka.actor.DeadLetterActorRef.specialHandle(ActorRef.scala:541) at akka.actor.DeadLetterActorRef.$bang(ActorRef.scala:531) at akka.remote.RemoteActorRefProvider$RemoteDeadLetterActorRef.$bang(RemoteActorRefProvider.scala:87) at akka.remote.EndpointManager$$anonfun$1.applyOrElse(Remoting.scala:575) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
Re: build jar with all dependencies
Can you run using spark-submit? What is happening is that you are running a simple java program -- you've wrapped spark-core in your fat jar but at runtime you likely need the whole Spark system in order to run your application. I would mark the spark-core as provided(so you don't wrap it in your fat jar) and run via spark submit. If you insist on running via java for whatever reason, see the runtime path that spark submit sets up and make sure you include all of these jars when you run your app On Tue, Jun 2, 2015 at 9:57 AM, Pa Rö paul.roewer1...@googlemail.com wrote: okay, but how i can compile my app to run this without -Dconfig.file=alt_ reference1.conf? 2015-06-02 15:43 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com: This looks like your app is not finding your Typesafe config. The config should usually be placed in particular folder under your app to be seen correctly. If it's in a non-standard location you can pass -Dconfig.file=alt_reference1.conf to java to tell it where to look. If this is a config that belogs to Spark and not your app, I'd recommend running your jar via spark submit (that should run) and dump out the classpath/variables that spark submit sets up... On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hello community, i have build a jar file from my spark app with maven (mvn clean compile assembly:single) and the following pom file: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdma-spark/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging namema-spark/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos/ /url /repository /repositories dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdmgm.tp.bigdata/groupId artifactIdma-commons/artifactId version0.0.1-SNAPSHOT/version /dependency /dependencies build plugins plugin artifactIdmaven-assembly-plugin/artifactId configuration archive manifest mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass /manifest /archive descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project if i run my app with java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the following error message: proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils WARN - Your hostname, proewer-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils WARN - Set SPARK_LOCAL_IP if you need to bind to another address 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager INFO - Changing view acls to: proewer 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager INFO - Changing modify acls to: proewer 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager INFO - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(proewer); users with modify permissions: Set(proewer) Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155) at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197) at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at
Re: spark sql - reading data from sql tables having space in column names
I would think the easiest way would be to create a view in DB with column names with no space. In fact, you can pass a sql in place of a real table. From documentation: The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. Kindly let the community know if this works On Tue, Jun 2, 2015 at 6:43 PM, Sachin Goyal sachin.go...@jabong.com wrote: Hi, We are using spark sql (1.3.1) to load data from Microsoft sql server using jdbc (as described in https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases ). It is working fine except when there is a space in column names (we can't modify the schemas to remove space as it is a legacy database). Sqoop is able to handle such scenarios by enclosing column names in '[ ]' - the recommended method from microsoft sql server. ( https://github.com/apache/sqoop/blob/trunk/src/java/org/apache/sqoop/manager/SQLServerManager.java - line no 319) Is there a way to handle this in spark sql? Thanks, sachin -- Best Regards, Ayan Guha
Re: build jar with all dependencies
okay, but how i can compile my app to run this without -Dconfig.file=alt_ reference1.conf? 2015-06-02 15:43 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com: This looks like your app is not finding your Typesafe config. The config should usually be placed in particular folder under your app to be seen correctly. If it's in a non-standard location you can pass -Dconfig.file=alt_reference1.conf to java to tell it where to look. If this is a config that belogs to Spark and not your app, I'd recommend running your jar via spark submit (that should run) and dump out the classpath/variables that spark submit sets up... On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hello community, i have build a jar file from my spark app with maven (mvn clean compile assembly:single) and the following pom file: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdma-spark/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging namema-spark/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos/ /url /repository /repositories dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdmgm.tp.bigdata/groupId artifactIdma-commons/artifactId version0.0.1-SNAPSHOT/version /dependency /dependencies build plugins plugin artifactIdmaven-assembly-plugin/artifactId configuration archive manifest mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass /manifest /archive descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project if i run my app with java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the following error message: proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils WARN - Your hostname, proewer-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils WARN - Set SPARK_LOCAL_IP if you need to bind to another address 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager INFO - Changing view acls to: proewer 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager INFO - Changing modify acls to: proewer 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager INFO - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(proewer); users with modify permissions: Set(proewer) Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155) at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197) at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156) at
Re: build jar with all dependencies
This looks like your app is not finding your Typesafe config. The config should usually be placed in particular folder under your app to be seen correctly. If it's in a non-standard location you can pass -Dconfig.file=alt_reference1.conf to java to tell it where to look. If this is a config that belogs to Spark and not your app, I'd recommend running your jar via spark submit (that should run) and dump out the classpath/variables that spark submit sets up... On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com wrote: hello community, i have build a jar file from my spark app with maven (mvn clean compile assembly:single) and the following pom file: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi= http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdma-spark/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging namema-spark/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos/ /url /repository /repositories dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdmgm.tp.bigdata/groupId artifactIdma-commons/artifactId version0.0.1-SNAPSHOT/version /dependency /dependencies build plugins plugin artifactIdmaven-assembly-plugin/artifactId configuration archive manifest mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass /manifest /archive descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project if i run my app with java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the following error message: proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils WARN - Your hostname, proewer-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils WARN - Set SPARK_LOCAL_IP if you need to bind to another address 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager INFO - Changing view acls to: proewer 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager INFO - Changing modify acls to: proewer 2015-Jun-02 12:53:36,403 [main] org.apache.spark.SecurityManager INFO - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(proewer); users with modify permissions: Set(proewer) Exception in thread main com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.version' at com.typesafe.config.impl.SimpleConfig.findKey(SimpleConfig.java:115) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:136) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:142) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:150) at com.typesafe.config.impl.SimpleConfig.find(SimpleConfig.java:155) at com.typesafe.config.impl.SimpleConfig.getString(SimpleConfig.java:197) at akka.actor.ActorSystem$Settings.init(ActorSystem.scala:136) at akka.actor.ActorSystemImpl.init(ActorSystem.scala:470) at akka.actor.ActorSystem$.apply(ActorSystem.scala:111) at akka.actor.ActorSystem$.apply(ActorSystem.scala:104) at org.apache.spark.util.AkkaUtils$.org$apache$spark$util$AkkaUtils$$doCreateActorSystem(AkkaUtils.scala:121) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:54) at org.apache.spark.util.AkkaUtils$$anonfun$1.apply(AkkaUtils.scala:53) at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1454) at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1450) at org.apache.spark.util.AkkaUtils$.createActorSystem(AkkaUtils.scala:56) at org.apache.spark.SparkEnv$.create(SparkEnv.scala:156) at org.apache.spark.SparkContext.init(SparkContext.scala:203) at org.apache.spark.api.java.JavaSparkContext.init(JavaSparkContext.scala:53) at mgm.tp.bigdata.ma_spark.SparkMain.main(SparkMain.java:38) what i do
Re: How to read sequence File.
Spark Shell: val x = sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761, classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]) OR val x = sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761, classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.LongWritable]) OR val x = sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761, classOf[org.apache.hadoop.io. LongWritable], classOf[org.apache.hadoop.io.Text]) x.take(10).foreach(println) is throwing === Exception: 15/06/02 05:49:51 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.io.NotSerializableException: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: 290090268£2013112699) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/06/02 05:49:51 ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0 (TID 2) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: 290090268£2013112699) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10); not retrying 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Stage 2 (take at console:24) failed in 0.032 s 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Job 2 failed: take at console:24, took 0.041207 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 2) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: 290090268£2013112699) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) === ./bin/spark-shell -v --driver-class-path
Re: Compute Median in Spark Dataframe
Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier.
Re: How to read sequence File.
Basically, you need to convert it to a serializable format before doing the collect/take. You can fire up a spark shell and paste this: val sFile = sc.sequenceFile[LongWritable, Text](/home/akhld/sequence /sigmoid) *.map(_._2.toString)* sFile.take(5).foreach(println) Use the attached sequence file generator and generated sequence file that i used for testing. Also note:If you don't do the .map to convert to string, then it will end up with the serializable Exception that you are hitting. [image: Inline image 1] Thanks Best Regards On Tue, Jun 2, 2015 at 6:21 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Spark Shell: val x = sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761, classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.Text]) OR val x = sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761, classOf[org.apache.hadoop.io.Text], classOf[org.apache.hadoop.io.LongWritable]) OR val x = sc.sequenceFile(/sys/edw/dw_lstg_item/snapshot/2015/06/01/00/part-r-00761, classOf[org.apache.hadoop.io. LongWritable], classOf[org.apache.hadoop.io.Text]) x.take(10).foreach(println) is throwing === Exception: 15/06/02 05:49:51 ERROR executor.Executor: Exception in task 0.0 in stage 2.0 (TID 2) java.io.NotSerializableException: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: 290090268£2013112699) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/06/02 05:49:51 ERROR scheduler.TaskSetManager: Task 0.0 in stage 2.0 (TID 2) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: 290090268£2013112699) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10); not retrying 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 15/06/02 05:49:51 INFO scheduler.TaskSchedulerImpl: Cancelling stage 2 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Stage 2 (take at console:24) failed in 0.032 s 15/06/02 05:49:51 INFO scheduler.DAGScheduler: Job 2 failed: take at console:24, took 0.041207 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 2.0 (TID 2) had a not serializable result: org.apache.hadoop.io.Text Serialization stack: - object not serializable (class: org.apache.hadoop.io.Text, value: 290090268£2013112699) - field (class: scala.Tuple2, name: _1, type: class java.lang.Object) - object (class scala.Tuple2, (290090268£2013112699,2900902682000-04-012000-03-221969-12-3110111-9925370390166893734173-9991.001.0019.00.00.0020.0020.0020.00113NY000-99902000-04-01 05:02:21-992000-07-01DW_BATCH2005-01-13 12:09:50DW_BATCH)) - element of array (index: 0) - array (class [Lscala.Tuple2;, size 10) at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1204) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1193) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1192) at
Transactional guarantee while saving DataFrame into a DB
Hi list, With the help of Spark DataFrame API we can save a DataFrame into a database table through insertIntoJDBC() call. However, I could not find any info about how it handles the transactional guarantee. What if my program gets killed during the processing? Would it end up in partial load? Is it somehow possible to handle these kind of scenarios? Rollback or something of that sort? Many thanks. P.S : I am using spark-1.3.1-bin-hadoop2.4 with java 1.7 [image: http://] Tariq, Mohammad about.me/mti [image: http://] http://about.me/mti
Re: Compute Median in Spark Dataframe
I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier.
Re: build jar with all dependencies
which maven dependency i need, too?? http://www.cloudera.com/content/cloudera/en/documentation/core/v5-2-x/topics/cdh_vd_cdh5_maven_repo.html Am 02.06.2015 um 16:04 schrieb Yana Kadiyska: Can you run using spark-submit? What is happening is that you are running a simple java program -- you've wrapped spark-core in your fat jar but at runtime you likely need the whole Spark system in order to run your application. I would mark the spark-core as provided(so you don't wrap it in your fat jar) and run via spark submit. If you insist on running via java for whatever reason, see the runtime path that spark submit sets up and make sure you include all of these jars when you run your app On Tue, Jun 2, 2015 at 9:57 AM, Pa Rö paul.roewer1...@googlemail.com mailto:paul.roewer1...@googlemail.com wrote: okay, but how i can compile my app to run this without -Dconfig.file=alt_ reference1.conf? 2015-06-02 15:43 GMT+02:00 Yana Kadiyska yana.kadiy...@gmail.com mailto:yana.kadiy...@gmail.com: This looks like your app is not finding your Typesafe config. The config should usually be placed in particular folder under your app to be seen correctly. If it's in a non-standard location you can pass -Dconfig.file=alt_reference1.conf to java to tell it where to look. If this is a config that belogs to Spark and not your app, I'd recommend running your jar via spark submit (that should run) and dump out the classpath/variables that spark submit sets up... On Tue, Jun 2, 2015 at 6:58 AM, Pa Rö paul.roewer1...@googlemail.com mailto:paul.roewer1...@googlemail.com wrote: hello community, i have build a jar file from my spark app with maven (mvn clean compile assembly:single) and the following pom file: project xmlns=http://maven.apache.org/POM/4.0.0; xmlns:xsi=http://www.w3.org/2001/XMLSchema-instance; xsi:schemaLocation=http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd; modelVersion4.0.0/modelVersion groupIdmgm.tp.bigdata/groupId artifactIdma-spark/artifactId version0.0.1-SNAPSHOT/version packagingjar/packaging namema-spark/name urlhttp://maven.apache.org/url properties project.build.sourceEncodingUTF-8/project.build.sourceEncoding /properties repositories repository idcloudera/id urlhttps://repository.cloudera.com/artifactory/cloudera-repos//url /repository /repositories dependencies dependency groupIdjunit/groupId artifactIdjunit/artifactId version3.8.1/version scopetest/scope /dependency dependency groupIdorg.apache.spark/groupId artifactIdspark-core_2.10/artifactId version1.1.0-cdh5.2.5/version /dependency dependency groupIdmgm.tp.bigdata/groupId artifactIdma-commons/artifactId version0.0.1-SNAPSHOT/version /dependency /dependencies build plugins plugin artifactIdmaven-assembly-plugin/artifactId configuration archive manifest mainClassmgm.tp.bigdata.ma_spark.SparkMain/mainClass /manifest /archive descriptorRefs descriptorRefjar-with-dependencies/descriptorRef /descriptorRefs /configuration /plugin /plugins /build /project if i run my app with java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar on terminal, i get the following error message: proewer@proewer-VirtualBox:~/Schreibtisch$ java -jar ma-spark-0.0.1-SNAPSHOT-jar-with-dependencies.jar 2015-Jun-02 12:53:36,348 [main] org.apache.spark.util.Utils WARN - Your hostname, proewer-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface eth0) 2015-Jun-02 12:53:36,350 [main] org.apache.spark.util.Utils WARN - Set SPARK_LOCAL_IP if you need to bind to another address 2015-Jun-02 12:53:36,401 [main] org.apache.spark.SecurityManager INFO - Changing view acls to: proewer 2015-Jun-02 12:53:36,402 [main] org.apache.spark.SecurityManager INFO - Changing modify acls to: proewer 2015-Jun-02 12:53:36,403 [main]
Re: Compute Median in Spark Dataframe
Not super easily, the GroupedData class uses a strToExpr function which has a pretty limited set of functions so we cant pass in the name of an arbitrary hive UDAF (unless I'm missing something). We can instead construct an column with the expression you want and then pass it in to agg() that way (although then you need to call the hive UDAF there). There are some private classes in hiveUdfs.scala which expose hiveUdaf's as Spark SQL AggregateExpressions, but they are private. On Tue, Jun 2, 2015 at 8:28 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: I've finally come to the same conclusion, but isn't there any way to call this Hive UDAFs from the agg(percentile(key,0.5)) ?? Le mar. 2 juin 2015 à 15:37, Yana Kadiyska yana.kadiy...@gmail.com a écrit : Like this...sqlContext should be a HiveContext instance case class KeyValue(key: Int, value: String) val df=sc.parallelize(1 to 50).map(i=KeyValue(i, i.toString)).toDF df.registerTempTable(table) sqlContext.sql(select percentile(key,0.5) from table).show() On Tue, Jun 2, 2015 at 8:07 AM, Olivier Girardot o.girar...@lateral-thoughts.com wrote: Hi everyone, Is there any way to compute a median on a column using Spark's Dataframe. I know you can use stats in a RDD but I'd rather stay within a dataframe. Hive seems to imply that using ntile one can compute percentiles, quartiles and therefore a median. Does anyone have experience with this ? Regards, Olivier. -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Re: spark 1.3.1 jars in repo1.maven.org
I think this is causing issues upgrading ADAM https://github.com/bigdatagenomics/adam to Spark 1.3.1 (cf. adam#690 https://github.com/bigdatagenomics/adam/pull/690#issuecomment-107769383); attempting to build against Hadoop 1.0.4 yields errors like: 2015-06-02 15:57:44 ERROR Executor:96 - Exception in task 0.0 in stage 0.0 (TID 0) *java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected* at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 2015-06-02 15:57:44 WARN TaskSetManager:71 - Lost task 0.0 in stage 0.0 (TID 0, localhost): java.lang.IncompatibleClassChangeError: Found class org.apache.hadoop.mapreduce.TaskAttemptContext, but interface was expected at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:95) at org.apache.spark.SparkHadoopWriter.commit(SparkHadoopWriter.scala:106) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1082) at org.apache.spark.rdd.PairRDDFunctions$$anonfun$13.apply(PairRDDFunctions.scala:1059) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) TaskAttemptContext is a class in Hadoop 1.0.4, but an interface in Hadoop 2; Spark 1.3.1 expects the interface but is getting the class. It sounds like, while I *can* depend on Spark 1.3.1 and Hadoop 1.0.4, I then need to hope that I don't exercise certain Spark code paths that run afoul of differences between Hadoop 1 and 2; does that seem correct? Thanks! On Wed, May 20, 2015 at 1:52 PM Sean Owen so...@cloudera.com wrote: I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
updateStateByKey and kafka direct approach without shuffle
Hi all, In my streaming job I'm using kafka streaming direct approach and want to maintain state with updateStateByKey. My PairRDD has message's topic name + partition id as a key. So, I assume that updateByState could work within same partition as KafkaRDD and not lead to shuffles. Actually this is not true, because updateStateByKey leads to cogroup transformation that thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not have partitioner at all. So, dependency is considered to be wide and leads to shuffle. I tried to avoid shuffling by providing custom partitioner to updateStateByKey, but KafkaRDD need to use same partitioner. For this I created a proxy RDD that just returns my partitioner. class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner) extends RDD[T](prev) { override val partitioner = Some(part) override def compute(split: Partition, context: TaskContext): Iterator[T] = prev.compute(split, context) override protected def getPartitions: Array[Partition] = prev.partitions override def getPreferredLocations(thePart: Partition): Seq[String] = prev.preferredLocations(thePart) } I use it as: val partitioner = new Partitioner { // TODO this should be retrieved from kafka override def numPartitions: Int = 2 override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } } inputStream .map(m = ((m.topic, m.partition), m.value)) .transform(new ProxyRDDWithPartitioner(_, partitioner)) .updateStateByKey(func, partitioner) The question is - is it safe to do such trick?
Re: union and reduceByKey wrong shuffle?
Ah, interesting. While working on my new Tungsten shuffle manager, I came up with some nice testing interfaces for allowing me to manually trigger spills in order to deterministically test those code paths without requiring large amounts of data to be shuffled. Maybe I could make similar test interface changes to the existing shuffle code, which might make it easier to reproduce this in an isolated environment. On Mon, Jun 1, 2015 at 11:41 PM, Igor Berman igor.ber...@gmail.com wrote: Hi, small mock data doesn't reproduce the problem. IMHO problem is reproduced when we make shuffle big enough to split data into disk. We will work on it to understand and reproduce the problem(not first priority though...) On 1 June 2015 at 23:02, Josh Rosen rosenvi...@gmail.com wrote: How much work is to produce a small standalone reproduction? Can you create an Avro file with some mock data, maybe 10 or so records, then reproduce this locally? On Mon, Jun 1, 2015 at 12:31 PM, Igor Berman igor.ber...@gmail.com wrote: switching to use simple pojos instead of using avro for spark serialization solved the problem(I mean reading avro from s3 and than mapping each avro object to it's pojo serializable counterpart with same fields, pojo is registered withing kryo) Any thought where to look for a problem/misconfiguration? On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote: Hi We are using spark 1.3.1 Avro-chill (tomorrow will check if its important) we register avro classes from java Avro 1.7.6 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote: Which Spark version are you using? I'd like to understand whether this change could be caused by recent Kryo serializer re-use changes in master / Spark 1.4. On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com wrote: after investigation the problem is somehow connected to avro serialization with kryo + chill-avro(mapping avro object to simple scala case class and running reduce on these case class objects solves the problem) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: data localisation in spark
Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
My suggestion is that you change the Spark setting which controls the compression codec that Spark uses for internal data transfers. Set spark.io.compression.codec to lzf in your SparkConf. On Mon, Jun 1, 2015 at 8:46 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com wrote: Hello Josh, Are you suggesting to store the source data in LZF compression and use the same Spark code as is ? Currently its stored in sequence file format and compressed with GZIP. First line of the data: (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text' org.apache.hadoop.io.compress.GzipCodec?v? ) Regards, Deepak On Tue, Jun 2, 2015 at 4:16 AM, Josh Rosen rosenvi...@gmail.com wrote: If you can't run a patched Spark version, then you could also consider using LZF compression instead, since that codec isn't affected by this bug. On Mon, Jun 1, 2015 at 3:32 PM, Andrew Or and...@databricks.com wrote: Hi Deepak, This is a notorious bug that is being tracked at https://issues.apache.org/jira/browse/SPARK-4105. We have fixed one source of this bug (it turns out Snappy had a bug in buffer reuse that caused data corruption). There are other known sources that are being addressed in outstanding patches currently. Since you're using 1.3.1 my guess is that you don't have this patch: https://github.com/apache/spark/pull/6176, which I believe should fix the issue in your case. It's merged for 1.3.2 (not yet released) but not in time for 1.3.1, so feel free to patch it yourself and see if it works. -Andrew 2015-06-01 8:00 GMT-07:00 ÐΞ€ρ@Ҝ (๏̯͡๏) deepuj...@gmail.com: Any suggestions ? I using Spark 1.3.1 to read sequence file stored in Sequence File format (SEQorg.apache.hadoop.io.Textorg.apache.hadoop.io.Text'org.apache.hadoop.io.compress.GzipCodec?v? ) with this code and settings sc.sequenceFile(dwTable, classOf[Text], classOf[Text]).partitionBy(new org.apache.spark.HashPartitioner(2053)) .set(spark.serializer, org.apache.spark.serializer.KryoSerializer) .set(spark.kryoserializer.buffer.mb, arguments.get(buffersize).get) .set(spark.kryoserializer.buffer.max.mb, arguments.get(maxbuffersize).get) .set(spark.driver.maxResultSize, arguments.get(maxResultSize).get) .set(spark.yarn.maxAppAttempts, 0) //.set(spark.akka.askTimeout, arguments.get(askTimeout).get) //.set(spark.akka.timeout, arguments.get(akkaTimeout).get) //.set(spark.worker.timeout, arguments.get(workerTimeout).get) .registerKryoClasses(Array(classOf[com.ebay.ep.poc.spark.reporting.process.model.dw.SpsLevelMetricSum])) and values are buffersize=128 maxbuffersize=1068 maxResultSize=200G And i see this exception in each executor task FetchFailed(BlockManagerId(278, phxaishdc9dn1830.stratus.phx.ebay.com, 54757), shuffleId=6, mapId=2810, reduceId=1117, message= org.apache.spark.shuffle.FetchFailedException: FAILED_TO_UNCOMPRESS(5) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:248) at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:172) at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:79) at org.apache.spark.rdd.RDD.iterator(RDD.scala:242) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) *Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)* at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:84) at org.xerial.snappy.SnappyNative.rawUncompress(Native Method) at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:444) at org.xerial.snappy.Snappy.uncompress(Snappy.java:480) at org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:135) at
Re: Spark 1.3.0: how to let Spark history load old records?
I think Spark doesn't keep historical metrics. You can use something like SPM for that - http://blog.sematext.com/2014/01/30/announcement-apache-storm-monitoring-in-spm/ Otis -- Monitoring * Alerting * Anomaly Detection * Centralized Log Management Solr Elasticsearch Support * http://sematext.com/ On Mon, Jun 1, 2015 at 11:36 PM, Haopu Wang hw...@qilinsoft.com wrote: When I start the Spark master process, the old records are not shown in the monitoring UI. How to show the old records? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Scala By the Bay + Big Data Scala 2015 Program Announced
The programs and schedules for Scala By the Bay (SBTB) and Big Data Scala By the Bay (BDS) 2015 conferences are announced and published: Scala By the Bay http://scala.bythebay.io — August 13-16 ( scala.bythebay.io) Big Data Scala By the Bay http://bigdatascala.bythebay.io — August 16-18 ( bigdatascala.bythebay.io) There are 77 best talks from the leading companies using Scala, Spark, and other Scala-based projects in production, including Twitter, Salesforce, Cloudera, Verizon, Comcast, Spotify, Hootsuite, Typesafe, Databricks, Nitro, Liveperson, Tableau, and many others. SBTB + BDS Schedule http://scala.bythebay.io/schedule.html SBTB and BDS are separate conferences, with BDS expanding into data science and data management. They share an innovative end-to-end pipeline training on 8/16, when in one day, we’ll teach hundreds of developers to build a web-scale startup on Mesos, Akka, Kafka, Spark, and Cassandra, taught by engineers from Mesosphere, Typesafe, Confluent, Databricks, and DataStax, respectively. For the first time in the history of any of the Scala conferences, Twitter adds a whole Finagle Day to SBTB, teaching OSS developers the biggest real-time Scala stack in production via hands-on workshops taught by Twitter engineers, and a series of talks from Finagle creators and users inside and outside Twitter. SBTB+BDS topics include higher-order abstractions for multiple application areas, data pipelines, “big” data analytics, Machine Learning and Natural Language Processing, datacenter management with Mesos, and more. The key themes unifying both conferences is applying rigorous Functional Programming principles for DRY and elegant codebases that can grow with smart teams as companies go web-scale, and the emergence of Reactive Systems that replace ETL with common object models applied across all stages of an application — from API to message bus to real-time analytics. Several versions of the resulting “lambda architectures” will be presented. Martin Odersky, the creator of Scala, will keynote the By the Bay conferences for the first time. Jonas Bonér, the CTO of Typesafe, is developing a completely new talk for Scala By the Bay, which he will keynote together with Dean Wampler, Dick Wall, Vidhya Narayanan, and Andrew Headrick. Vidhya leads the Verizon OnCue Scala team, with some of the highest concentration of FP talent in the world, and Andrew, formerly the Akka architect at Ticketfly, is now a CEO of InnoVint, a local startup managing wineries with Scala — proving we have the most fun local Scala conference. Special thanks go to Cloudera, who crystallized the Big Data Scala conference, and whose Chairman and Chief Strategy Officer, Mike Olson, will keynote it together with Martin Odersky, Matei Zaharia, Jay Kreps, and Debora Donato. Cloudera is backing Scala and Spark across the two key themes of Big Data Scala — end-to-end data pipelines in Scala and Data Science on the JVM. Big Data Scala includes several topics which are the focus of the SF Text, Text By the Bay, SF Spark and Friends communities ( sftext.org, text.bythebay.io, sfspark.org, respectively). We had a record number of submissions this year and had to make tough choices to keep the conferences to two tracks each. This is the biggest and the best Scala By the Bay conferences we’ve put together so far. Given the program is finally published, we’re pushing back late bird to June 15th. We have about 400 seats capacity and a significant portion was already claimed even before the schedule was announced, mostly by folks returning from the previous years. All the previous By the Bay conferences sold out, and conferences will sell out quickly this time, so reserve your seat soon. We're welcoming sponsors, old and new, who get a block of seats as well -- email spons...@scalabythebay.org for prospectus. All 18 sponsors of the 2014 edition were hiring. We have special programs for non-profits/making the world better kinds of projects, email organiz...@scalabythebay.org or organiz...@bigdatascala.org if you use Scala for Good and want to attend SBTB+BDS, and get community support for your projects and teams (even teams of one!). See you on the shores of Lake Merritt in August! A+ SBTB+BDS
Embedding your own transformer in Spark.ml Pipleline
Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: data localisation in spark
It is not possible with JavaSparkContext either. The API mentioned below currently does not have any effect (we should document this). The primary difference between MR and Spark here is that MR runs each task in its own YARN container, while Spark runs multiple tasks within an executor, which needs to be requested before Spark knows what tasks it will run. Although dynamic allocation improves that last part. -Sandy On Tue, Jun 2, 2015 at 9:55 AM, Shushant Arora shushantaror...@gmail.com wrote: Is it possible in JavaSparkContext ? JavaSparkContext jsc = new JavaSparkContext(conf); JavaRDDStringlines = jsc.textFile(args[0]); If yes , does its programmer's responsibilty to first calculate splits locations and then instantiate spark context with preferred locations? How does its achieved in MR2 with yarn, there is Application Master specifies split locations to ResourceManager before acquiring the node managers ? On Mon, Jun 1, 2015 at 7:24 AM, bit1...@163.com bit1...@163.com wrote: Take a look at the following SparkContext constructor variant that tries to honor the data locality in YARN mode. /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. * * @param preferredNodeLocationData used in YARN mode to select nodes to launch containers on. * Can be generated using [[org.apache.spark.scheduler.InputFormatInfo.computePreferredLocations]] * from a list of input files or InputFormats for the application. */ @DeveloperApi def this(config: SparkConf, preferredNodeLocationData: Map[String, Set[SplitInfo]]) = { this(config) this.preferredNodeLocationData = preferredNodeLocationData } -- bit1...@163.com *From:* Shushant Arora shushantaror...@gmail.com *Date:* 2015-05-31 22:54 *To:* user user@spark.apache.org *Subject:* data localisation in spark I want to understand how spark takes care of data localisation in cluster mode when run on YARN. 1.Driver program asks ResourceManager for executors. Does it tell yarn's RM to check HDFS blocks of input data and then allocate executors to it. And executors remain fixed throughout application or driver program asks for new executors when it submits another job in same application , since in spark new job is created for each action . If executors are fixed then for second job achieving data localisation is impossible? 2.When executors are done with their processing, does they are marked as free in ResourceManager's resoruce queue and executors directly tell this to Rm instead of via driver's ? Thanks Shushant
Re: updateStateByKey and kafka direct approach without shuffle
Cody, Thanks, good point. I fixed getting partition id to: class MyPartitioner(offsetRanges: Array[OffsetRange]) extends Partitioner { override def numPartitions: Int = offsetRanges.size override def getPartition(key: Any): Int = { // this is set in .map(m = (TaskContext.get().partitionId(), m.value)) key.asInstanceOf[Int] } } inputStream .map(m = (TaskContext.get().partitionId(), m.value)) .transform { rdd = val part = new MyPartitioner(rdd.asInstanceOf[HasOffsetRanges].offsetRanges) new ProxyRDDWithPartitioner(rdd, part) } ... But how can I create same partitioner during updateStateByKey call? I have no idea how to access rdd when calling updateStateByKey. вт, 2 июня 2015 г. в 19:15, Cody Koeninger c...@koeninger.org: I think the general idea is worth pursuing. However, this line: override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } is using the kafka partition id, not the spark partition index, so it's going to give you fewer partitions / incorrect index Cast the rdd to HasOffsetRanges, get the offsetRanges from it. The index into the offset range array matches the (spark) partition id. That will also tell you what the value of numPartitions should be. On Tue, Jun 2, 2015 at 11:50 AM, Krot Viacheslav krot.vyaches...@gmail.com wrote: Hi all, In my streaming job I'm using kafka streaming direct approach and want to maintain state with updateStateByKey. My PairRDD has message's topic name + partition id as a key. So, I assume that updateByState could work within same partition as KafkaRDD and not lead to shuffles. Actually this is not true, because updateStateByKey leads to cogroup transformation that thinks, that state rdd and kafka rdd are not co-partitioned, as kafka rdd does not have partitioner at all. So, dependency is considered to be wide and leads to shuffle. I tried to avoid shuffling by providing custom partitioner to updateStateByKey, but KafkaRDD need to use same partitioner. For this I created a proxy RDD that just returns my partitioner. class ProxyRDDWithPartitioner[T: ClassTag](prev: RDD[T], part: Partitioner) extends RDD[T](prev) { override val partitioner = Some(part) override def compute(split: Partition, context: TaskContext): Iterator[T] = prev.compute(split, context) override protected def getPartitions: Array[Partition] = prev.partitions override def getPreferredLocations(thePart: Partition): Seq[String] = prev.preferredLocations(thePart) } I use it as: val partitioner = new Partitioner { // TODO this should be retrieved from kafka override def numPartitions: Int = 2 override def getPartition(key: Any): Int = { key.asInstanceOf[(String, Int)]._2 } } inputStream .map(m = ((m.topic, m.partition), m.value)) .transform(new ProxyRDDWithPartitioner(_, partitioner)) .updateStateByKey(func, partitioner) The question is - is it safe to do such trick?
Re: Embedding your own transformer in Spark.ml Pipleline
Hi We are in the process of adding examples for feature transformations ( https://issues.apache.org/jira/browse/SPARK-7546) and this should be available shortly on Spark Master. In the meanwhile, the best place to start would be to look at how the Tokenizer works here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/feature/Tokenizer.scala You need to implement the Transformer interface as above. In this case a UnaryTransformer since the feature transformer acts on one column, transforms it and outputs another column. and an example of how to build a pipeline that includes a feature transformer (the HashingTF is the feature transformer analogous to what you would build): https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala but stay tuned, we should have examples in Python, Scala and Java soon Ram On Tue, Jun 2, 2015 at 10:19 AM, dimple dimp201...@gmail.com wrote: Hi, I would like to embed my own transformer in the Spark.ml Pipleline but do not see an example of it. Can someone share an example of which classes/interfaces I need to extend/implement in order to do so. Thanks. Dimple -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Embedding-your-own-transformer-in-Spark-ml-Pipleline-tp23112.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Join highly skewed datasets
We use Scoobi + MR to perform joins and we particularly use blockJoin() API of scoobi /** Perform an equijoin with another distributed list where this list is considerably smaller * than the right (but too large to fit in memory), and where the keys of right may be * particularly skewed. */ def blockJoin[B : WireFormat](right: DList[(K, B)]): DList[(K, (A, B))] = Relational.blockJoin(left, right) I am trying to do a POC and what Spark join API(s) is recommended to achieve something similar ? Please suggest. -- Deepak
SparkSQL: How to specify replication factor on the persisted parquet files?
Hi, I'm trying to save SparkSQL DataFrame to a persistent Hive table using the default parquet data source. I don't know how to change the replication factor of the generated parquet files on HDFS. I tried to set dfs.replication on HiveContext but that didn't work. Any suggestions are appreciated very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: union and reduceByKey wrong shuffle?
Hi, small mock data doesn't reproduce the problem. IMHO problem is reproduced when we make shuffle big enough to split data into disk. We will work on it to understand and reproduce the problem(not first priority though...) On 1 June 2015 at 23:02, Josh Rosen rosenvi...@gmail.com wrote: How much work is to produce a small standalone reproduction? Can you create an Avro file with some mock data, maybe 10 or so records, then reproduce this locally? On Mon, Jun 1, 2015 at 12:31 PM, Igor Berman igor.ber...@gmail.com wrote: switching to use simple pojos instead of using avro for spark serialization solved the problem(I mean reading avro from s3 and than mapping each avro object to it's pojo serializable counterpart with same fields, pojo is registered withing kryo) Any thought where to look for a problem/misconfiguration? On 31 May 2015 at 22:48, Igor Berman igor.ber...@gmail.com wrote: Hi We are using spark 1.3.1 Avro-chill (tomorrow will check if its important) we register avro classes from java Avro 1.7.6 On May 31, 2015 22:37, Josh Rosen rosenvi...@gmail.com wrote: Which Spark version are you using? I'd like to understand whether this change could be caused by recent Kryo serializer re-use changes in master / Spark 1.4. On Sun, May 31, 2015 at 11:31 AM, igor.berman igor.ber...@gmail.com wrote: after investigation the problem is somehow connected to avro serialization with kryo + chill-avro(mapping avro object to simple scala case class and running reduce on these case class objects solves the problem) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/union-and-reduceByKey-wrong-shuffle-tp23092p23093.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org