Re: How to implement an Evaluator for a ML pipeline?
Thanks, Xiangrui, for clarifying the metric and creating that JIRA issue. I made an error while composing my earlier mail: paramMap.get(als.regParam) in my Evaluator actually returns None. I just happended to use getOrElse(1.0) in my tests, which explains why negating the metric did not change anything. -Stefan PS: I got an error while sending my previous mail via the web interface, and did not think it got through to the list. So I did not follow up on my problem myself. Sorry for the confusion. Am 19.05.2015 um 21:54 schrieb Xiangrui Meng: The documentation needs to be updated to state that higher metric values are better (https://issues.apache.org/jira/browse/SPARK-7740). I don't know why if you negate the return value of the Evaluator you still get the highest regularization parameter candidate. Maybe you should check the log messages from CrossValidator and see the average metric values during cross validation. -Xiangrui On Sat, May 9, 2015 at 12:15 PM, Stefan H. twel...@gmx.de wrote: Hello everyone, I am stuck with the (experimental, I think) API for machine learning pipelines. I have a pipeline with just one estimator (ALS) and I want it to try different values for the regularization parameter. Therefore I need to supply an Evaluator that returns a value of type Double. I guess this could be something like accuracy or mean squared error? The only implementation I found is BinaryClassificationEvaluator, and I did not understand the computation there. I could not find detailed documentation so I implemented a dummy Evaluator that just returns the regularization parameter: new Evaluator { def evaluate(dataset: DataFrame, paramMap: ParamMap): Double = paramMap.get(als.regParam).getOrElse(throw new Exception) } I just wanted to see whether the lower or higher value wins. On the resulting model I inspected the chosen regularization parameter this way: cvModel.bestModel.fittingParamMap.get(als.regParam) And it was the highest of my three regularization parameter candidates. Strange thing is, if I negate the return value of the Evaluator, that line still returns the highest regularization parameter candidate. So I am probably working with false assumptions. I'd be grateful if someone could point me to some documentation or examples, or has a few hints to share. Cheers, Stefan -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming to Kafka
Thanks Saisai. On Wed, May 20, 2015 at 11:23 AM, Saisai Shao sai.sai.s...@gmail.com wrote: I think here is the PR https://github.com/apache/spark/pull/2994 you could refer to. 2015-05-20 13:41 GMT+08:00 twinkle sachdeva twinkle.sachd...@gmail.com: Hi, As Spark streaming is being nicely integrated with consuming messages from Kafka, so I thought of asking the forum, that is there any implementation available for pushing data to Kafka from Spark Streaming too? Any link(s) will be helpful. Thanks and Regards, Twinkle
Re: spark streaming doubt
One receiver basically runs on 1 core, so if your single node is having 4 cores, there are still 3 cores left for the processing (for executors). And yes receiver remains on the same machine unless some failure happens. Thanks Best Regards On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set spark.streaming.concurrentJobs to true. Will the all pending jobs starts one by one after 1 jobs completes,or it does not creates jobs which could not be started at its desired interval. And Whats the difference and usage of Receiver vs non-receiver based streaming. Is there any documentation for that? On Tue, May 19, 2015 at 1:35 PM, Akhil Das ak...@sigmoidanalytics.com wrote: It will be a single job running at a time by default (you can also configure the spark.streaming.concurrentJobs to run jobs parallel which is not recommended to put in production). Now, your batch duration being 1 sec and processing time being 2 minutes, if you are using a receiver based streaming then ideally those receivers will keep on receiving data while the job is running (which will accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in block not found exceptions as spark drops
Re: Mesos Spark Tasks - Lost
Can you share your exact spark-submit command line? And also cluster mode is not yet released yet (1.4) and doesn't support spark-shell, so I think you're just using client mode unless you're using latest master. Tim On Tue, May 19, 2015 at 8:57 AM, Panagiotis Garefalakis panga...@gmail.com wrote: Hello all, I am facing a weird issue for the last couple of days running Spark on top of Mesos and I need your help. I am running Mesos in a private cluster and managed to deploy successfully hdfs, cassandra, marathon and play but Spark is not working for a reason. I have tried so far: different java versions (1.6 and 1.7 oracle and openjdk), different spark-env configuration, different Spark versions (from 0.8.8 to 1.3.1), different HDFS versions (hadoop 5.1 and 4.6), and updating pom dependencies. More specifically while local tasks complete fine, in cluster mode all the tasks get lost. (both using spark-shell and spark-submit) From the worker log I see something like this: --- I0519 02:36:30.475064 12863 fetcher.cpp:214] Fetching URI 'hdfs:/:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' I0519 02:36:30.747372 12863 fetcher.cpp:99] Fetching URI 'hdfs://X:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' using Hadoop Client I0519 02:36:30.747546 12863 fetcher.cpp:109] Downloading resource from 'hdfs://:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' to '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' I0519 02:36:34.205878 12863 fetcher.cpp:78] Extracted resource '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' into '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3' *Error: Could not find or load main class two* --- And from the Spark Terminal: --- 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at SparkPi.scala:35 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at SparkPi.scala:35 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 26, ): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) .. at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) --- Any help will be greatly appreciated! Regards, Panagiotis
Re: Hive on Spark VS Spark SQL
I don't think that's quite the difference. Any SQL engine has a query planner and an execution engine. Both of these Spark for execution. HoS uses Hive for query planning. Although it's not optimized for execution on Spark per se, it's got a lot of language support and is stable/mature. Spark SQL's query planner is less developed at this point but purpose-built for Spark as an execution engine. Spark SQL is also how you put SQL-like operations in a Spark program -- programmatic SQL if you will -- which isn't what Hive or therefore HoS does. HoS is good if you're already using Hive and need its language features and need it as it works today, and want a faster batch execution version of it. On Wed, May 20, 2015 at 7:18 AM, Debasish Das debasish.da...@gmail.com wrote: SparkSQL was built to improve upon Hive on Spark runtime further... On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hive on Spark and SparkSQL which should be better , and what are the key characteristics and the advantages and the disadvantages between ? -- guoqing0...@yahoo.com.hk
Is this a good use case for Spark?
Hi all, I'm new to Spark -- so new that we're deciding whether to use it in the first place, and I was hoping someone here could help me figure that out. We're doing a lot of processing of legal documents -- in particular, the entire corpus of American law. It's about 10m documents, many of which are quite large as far as text goes (100s of pages). We'd like to (a) transform these documents from the various (often borked) formats they come to us in into a standard XML format, (b) when it is in a standard format, extract information from them (e.g., which judicial cases cite each other?) and annotate the documents with the information extracted, and then (c) deliver the end result to a repository (like s3) where it can be accessed by the user-facing application. Of course, we'd also like to do all of this quickly -- optimally, running the entire database through the whole pipeline in a few hours. We currently use a mix of Python and Java scripts (including XSLT, and NLP/unstructured data tools like UIMA and Stanford's CoreNLP) in various places along the pipeline we built for ourselves to handle these tasks. The current pipeline infrastructure was built a while back -- it's basically a number of HTTP servers that each have a single task and pass the document along from server to server as it goes through the processing pipeline. It's great although it's having trouble scaling, and there are some reliability issues. It's also a headache to handle all the infrastructure. For what it's worth, metadata about the documents resides in SQL, and the actual text of the documents lives in s3. It seems like Spark would be ideal for this, but after some searching I wasn't able to find too many examples of people using it for document-processing tasks (like transforming documents from one XML format into another) and I'm not clear if I can chain those sorts of tasks and NLP tasks, especially if some happen in Python and others in Java. Finally, I don't know if the size of the data (i.e., we'll likely want to run operations on whole documents, rather than just lines) imposes issues/constraints. Thanks all! Jake -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-this-a-good-use-case-for-Spark-tp22954.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark 1.3.1 jars in repo1.maven.org
Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Multi user setup and saving a DataFrame / RDD to a network exported file system
Hi, thanks for answer. The rights are drwxr-xr-x 3 tfruboes all 5632 05-19 15:40 test19EE/ I have tried setting the rights to 777 for this directory prior to execution. This does not get propagated down the chain, ie the directory created as a result of the save call (namesAndAges.parquet2 in the path in the dump [1] below) is created with the drwxr-xr-x rights (owned by the user submitting the job, ie tfruboes). The temp directories created inside namesAndAges.parquet2/_temporary/0/ (e.g. task_201505200920_0009_r_01) are owned by root, again with drwxr-xr-x access rights Cheers, Tomasz W dniu 19.05.2015 o 23:56, Davies Liu pisze: It surprises me, could you list the owner information of /mnt/lustre/bigdata/med_home/tmp/test19EE/ ? On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl wrote: Dear Experts, we have a spark cluster (standalone mode) in which master and workers are started from root account. Everything runs correctly to the point when we try doing operations such as dataFrame.select(name, age).save(ofile, parquet) or rdd.saveAsPickleFile(ofile) , where ofile is path on a network exported filesystem (visible on all nodes, in our case this is lustre, I guess on nfs effect would be similar). Unsurprisingly temp files created on workers are owned by root, which then leads to a crash (see [1] below). Is there a solution/workaround for this (e.g. controlling file creation mode of the temporary files)? Cheers, Tomasz ps I've tried to google this problem, couple of similar reports, but no clear answer/solution found ps2 For completeness - running master/workers as a regular user solves the problem only for the given user. For other users submitting to this master the result is given in [2] below [0] Cluster details: Master/workers: centos 6.5 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build) [1] ## File /mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.save. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; isDirectory=false; length=534; replication=1; blocksize=33554432; modification_time=1432042832000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) ## [2] ## 15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, wn23023.cis.gov.pl): java.io.IOException: Mkdirs failed to create file:/mnt/lustre/bigdata/med_home/tmp/test18/namesAndAges.parquet2/_temporary/0/_temporary/attempt_201505191445_0009_r_00_0 at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438) at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906) at
java program Get Stuck at broadcasting
Hi All, The variable I need to broadcast is just 468 MB. When broadcasting, it just “stop” at here: * 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process : 1 15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at Test1.java:90 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at Test1.java:90) with 4 output partitions (allowLocal=false) 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at Test1.java:90) 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List() 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no missing parents 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called with curMem=988453294, maxMem=2061647216 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 126.2 KB, free 1023.4 MB) 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called with curMem=988582558, maxMem=2061647216 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 76.4 KB, free 1023.3 MB) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB) 15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90) 15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB) ……. 15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB) * And didn’t go forward as I still waiting, basically not stop, but more like stuck. I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. After a few mins pass, the program stopped and showed something like this: 15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, HadoopV26Slave3): org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1 could only be replicated to 0 nodes instead of minReplication (=1). There are 6 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at
Re: Hive on Spark VS Spark SQL
SparkSQL was built to improve upon Hive on Spark runtime further... On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hive on Spark and SparkSQL which should be better , and what are the key characteristics and the advantages and the disadvantages between ? -- guoqing0...@yahoo.com.hk
Re: Spark and RabbitMQ
Hi, There is a RabbitMQ reciver for spark-streaming http://search.maven.org/#artifactdetails|com.stratio.receiver|rabbitmq|0.1.0-RELEASE|jar https://github.com/Stratio/RabbitMQ-Receiver 2015-05-12 14:49 GMT+02:00 Dmitry Goldenberg dgoldenberg...@gmail.com: Thanks, Akhil. It looks like in the second example, for Rabbit they're doing this: https://www.rabbitmq.com/mqtt.html. On Tue, May 12, 2015 at 7:37 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I found two examples Java version https://github.com/deepakkashyap/Spark-Streaming-with-RabbitMQ-/blob/master/example/Spark_project/CustomReceiver.java, and Scala version. https://github.com/d1eg0/spark-streaming-toy Thanks Best Regards On Tue, May 12, 2015 at 2:31 AM, dgoldenberg dgoldenberg...@gmail.com wrote: Are there existing or under development versions/modules for streaming messages out of RabbitMQ with SparkStreaming, or perhaps a RabbitMQ RDD? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-and-RabbitMQ-tp22852.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Reading Binary files in Spark program
If you can share the complete code and a sample file, may be i can try to reproduce it on my end. Thanks Best Regards On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma tapan.sha...@gmail.com wrote: Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) at
Re: Multi user setup and saving a DataFrame / RDD to a network exported file system
You could try setting `SPARK_USER` to the user under which your workers are running. I couldn't find many references to this variable, but at least Yarn and Mesos take it into account when spawning executors. Chances are that standalone mode also does it. iulian On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl wrote: Hi, thanks for answer. The rights are drwxr-xr-x 3 tfruboes all 5632 05-19 15:40 test19EE/ I have tried setting the rights to 777 for this directory prior to execution. This does not get propagated down the chain, ie the directory created as a result of the save call (namesAndAges.parquet2 in the path in the dump [1] below) is created with the drwxr-xr-x rights (owned by the user submitting the job, ie tfruboes). The temp directories created inside namesAndAges.parquet2/_temporary/0/ (e.g. task_201505200920_0009_r_01) are owned by root, again with drwxr-xr-x access rights Cheers, Tomasz W dniu 19.05.2015 o 23:56, Davies Liu pisze: It surprises me, could you list the owner information of /mnt/lustre/bigdata/med_home/tmp/test19EE/ ? On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl wrote: Dear Experts, we have a spark cluster (standalone mode) in which master and workers are started from root account. Everything runs correctly to the point when we try doing operations such as dataFrame.select(name, age).save(ofile, parquet) or rdd.saveAsPickleFile(ofile) , where ofile is path on a network exported filesystem (visible on all nodes, in our case this is lustre, I guess on nfs effect would be similar). Unsurprisingly temp files created on workers are owned by root, which then leads to a crash (see [1] below). Is there a solution/workaround for this (e.g. controlling file creation mode of the temporary files)? Cheers, Tomasz ps I've tried to google this problem, couple of similar reports, but no clear answer/solution found ps2 For completeness - running master/workers as a regular user solves the problem only for the given user. For other users submitting to this master the result is given in [2] below [0] Cluster details: Master/workers: centos 6.5 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build) [1] ## File /mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.save. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; isDirectory=false; length=534; replication=1; blocksize=33554432; modification_time=1432042832000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690) at org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129) at org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196) at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745) ## [2] ## 15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3, wn23023.cis.gov.pl):
RE: Intermittent difficulties for Worker to contact Master on same machine in standalone
Check whether the name can be resolved in the /etc/hosts file (or DNS) of the worker (the same btw applies for the Node where you run the driver app – all other nodes must be able to resolve its name) From: Stephen Boesch [mailto:java...@gmail.com] Sent: Wednesday, May 20, 2015 10:07 AM To: user Subject: Intermittent difficulties for Worker to contact Master on same machine in standalone What conditions would cause the following delays / failure for a standalone machine/cluster to have the Worker contact the Master? 15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at http://10.0.0.3:8081 15/05/20 02:02:53 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077 15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1) .. .. 15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3) 15/05/20 02:03:26 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077
How to set HBaseConfiguration in Spark
Hi, all I wrote a program to get HBaseConfiguration object in Spark. But after I printed the content of this hbase-conf object, I found they were wrong. For example, the property hbase.zookeeper.quorum should be bgdt01.dev.hrb,bgdt02.dev.hrb,bgdt03.hrb. But the printed value is localhost. Could anybody tell me how to set up the HBase Configuration in Spark? No matter it should be set in a configuration file or be set by a Spark API. Many Thanks! The code of my program is listed below: object TestHBaseConf { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create() hbConf.addResource(file:///etc/hbase/conf/hbase-site.xml) val it = hbConf.iterator() while(it.hasNext) { val e = it.next() println(Key=+ e.getKey + Value=+e.getValue) } val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9)) val result = rdd.sum() println(result=+result) sc.stop() } }
PySpark Logs location
Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
Spark Streaming - Design considerations/Knobs
Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. - The frequency of metadata checkpoint cleaning can be controlled using spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when the RDDs in the checkpoint are no more required. Thanks, Hemant
Re: Hive on Spark VS Spark SQL
And if I am not wrong, spark SQL api is intended to move closer to SQL standards. I feel its a clever decision on spark's part to keep both APIs operational. These short term confusions worth the long term benefits. On 20 May 2015 17:19, Sean Owen so...@cloudera.com wrote: I don't think that's quite the difference. Any SQL engine has a query planner and an execution engine. Both of these Spark for execution. HoS uses Hive for query planning. Although it's not optimized for execution on Spark per se, it's got a lot of language support and is stable/mature. Spark SQL's query planner is less developed at this point but purpose-built for Spark as an execution engine. Spark SQL is also how you put SQL-like operations in a Spark program -- programmatic SQL if you will -- which isn't what Hive or therefore HoS does. HoS is good if you're already using Hive and need its language features and need it as it works today, and want a faster batch execution version of it. On Wed, May 20, 2015 at 7:18 AM, Debasish Das debasish.da...@gmail.com wrote: SparkSQL was built to improve upon Hive on Spark runtime further... On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hive on Spark and SparkSQL which should be better , and what are the key characteristics and the advantages and the disadvantages between ? -- guoqing0...@yahoo.com.hk
Intermittent difficulties for Worker to contact Master on same machine in standalone
What conditions would cause the following delays / failure for a standalone machine/cluster to have the Worker contact the Master? 15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at http://10.0.0.3:8081 15/05/20 02:02:53 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077 15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1) .. .. 15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3) 15/05/20 02:03:26 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077
Re: Multi user setup and saving a DataFrame / RDD to a network exported file system
Thanks for a suggestion. I have tried playing with it, sc.sparkUser() gives me expected user name, but it doesnt solve the problem. From a quick search through the spark code it seems to me, that this setting is effective only for yarn and mesos. I think the workaround for the problem could be using --deploy-mode cluster (not 100% convenient, since disallows any interactive work), but this is not supported for python based programs. Cheers, Tomasz W dniu 20.05.2015 o 10:57, Iulian Dragoș pisze: You could try setting `SPARK_USER` to the user under which your workers are running. I couldn't find many references to this variable, but at least Yarn and Mesos take it into account when spawning executors. Chances are that standalone mode also does it. iulian On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote: Hi, thanks for answer. The rights are drwxr-xr-x 3 tfruboes all 5632 05-19 15 tel:5632%2005-19%2015:40 test19EE/ I have tried setting the rights to 777 for this directory prior to execution. This does not get propagated down the chain, ie the directory created as a result of the save call (namesAndAges.parquet2 in the path in the dump [1] below) is created with the drwxr-xr-x rights (owned by the user submitting the job, ie tfruboes). The temp directories created inside namesAndAges.parquet2/_temporary/0/ (e.g. task_201505200920_0009_r_01) are owned by root, again with drwxr-xr-x access rights Cheers, Tomasz W dniu 19.05.2015 o 23:56, Davies Liu pisze: It surprises me, could you list the owner information of /mnt/lustre/bigdata/med_home/tmp/test19EE/ ? On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote: Dear Experts, we have a spark cluster (standalone mode) in which master and workers are started from root account. Everything runs correctly to the point when we try doing operations such as dataFrame.select(name, age).save(ofile, parquet) or rdd.saveAsPickleFile(ofile) , where ofile is path on a network exported filesystem (visible on all nodes, in our case this is lustre, I guess on nfs effect would be similar). Unsurprisingly temp files created on workers are owned by root, which then leads to a crash (see [1] below). Is there a solution/workaround for this (e.g. controlling file creation mode of the temporary files)? Cheers, Tomasz ps I've tried to google this problem, couple of similar reports, but no clear answer/solution found ps2 For completeness - running master/workers as a regular user solves the problem only for the given user. For other users submitting to this master the result is given in [2] below [0] Cluster details: Master/workers: centos 6.5 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build) [1] ## File /mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.save. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; isDirectory=false; length=534; replication=1; blocksize=33554432; modification_time=1432042832000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43) at org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690) at
Re: spark streaming doubt
So I can explicitly specify no of receivers and executors in receiver based streaming? Can you share a sample program if any? Also in Low level non receiver based , will data be fetched by same worker executor node and processed ? Also if I have concurrent jobs set to 1- so in low level fetching and processing will be delayed till next job starts ,say a situation where I have 1 sec of stream interval but my job1 takes 5 sec to complete , hence job2 starts at end of 5 sec, so now will it process all data from sec1 to sec 5 in low level non receiver streaming or only for interval sec1-sec2 ? And if it processes data for complete duration sec1-sec5.Is there any option to suppress start of other queued jobs(for interval sec2-3, sec3-4,sec4-5) since there work is already done by job2 ? On Wed, May 20, 2015 at 12:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: One receiver basically runs on 1 core, so if your single node is having 4 cores, there are still 3 cores left for the processing (for executors). And yes receiver remains on the same machine unless some failure happens. Thanks Best Regards On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to improve this in the future. Copied from TD's answer written in SO http://stackoverflow.com/questions/23528006/how-jobs-are-assigned-to-executors-in-spark-streaming . Non-receiver based streaming for example you can say are the fileStream, directStream ones. You can read a bit of information from here https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html Thanks Best Regards On Tue, May 19, 2015 at 2:13 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil. When I don't set
saveasorcfile on partitioned orc
Hi, I followed the information on https://www.mail-archive.com/reviews@spark.apache.org/msg141113.html to save orc file with spark 1.2.1. I can save data to a new orc file. I wonder how to save data to an existing and partitioned orc file? Any suggestions? BR, Patcharee - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Code error
Hi Ricardo, instead of filtering header just remove the header of your file. In your code you create a filter for the header but you don't use it to compute parsedData. val parsedData = filter_data.map(s = Vectors.dense(s.split(','). map(_.toDouble))).cache() 2015-05-19 21:23 GMT+02:00 Stephen Boesch java...@gmail.com: Hi Ricardo, providing the error output would help . But in any case you need to do a collect() on the rdd returned from computeCost. 2015-05-19 11:59 GMT-07:00 Ricardo Goncalves da Silva ricardog.si...@telefonica.com: Hi, Can anybody see what’s wrong in this piece of code: ./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client import org.apache.spark.mllib.clustering.KMeans import org.apache.spark.mllib.linalg.Vectors val data = sc.textFile(/user/p_loadbd/fraude5.csv).map(x = x.toLowerCase.split(',')).map(x = x(0)+,+x(1)) val header = data.first() val filter_data = data.filter(x = x != header) val parsedData = data.map(s = Vectors.dense(s.split(',').map(_.toDouble))).cache() val numClusters = 2 val numIterations = 20 val clusters = KMeans.train(parsedData, numClusters, numIterations) val WSSSE = clusters.computeCost(parsedData) println(Within Set Sum of Squared Errors = + WSSSE) Thanks. [image: Descrição: Descrição: Descrição: cid:image002.jpg@01CC89A8.2B628650] *Ricardo Goncalves da Silva* Lead Data Scientist *|* Seção de Desenvolvimento de Sistemas de Business Intelligence – Projetos de Inovação *| *IDPB02 Av. Eng. Luis Carlos Berrini, 1.376 – 7º – 04571-000 - SP ricardog.si...@telefonica.com *|* www.telefonica.com.br Tel +55 11 3430 4955 *| *Cel +55 11 94292 9526 -- Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição -- Romain Sagean
Re: spark streaming doubt
On Wed, May 20, 2015 at 1:12 PM, Shushant Arora shushantaror...@gmail.com wrote: So I can explicitly specify no of receivers and executors in receiver based streaming? Can you share a sample program if any? -You can look at the lowlevel consumer repo https://github.com/dibbhatt/kafka-spark-consumer shared by Dibyendu for sample code. Also in Low level non receiver based , will data be fetched by same worker executor node and processed ? Also if I have concurrent jobs set to 1- so in low level fetching and processing will be delayed till next job starts ,say a situation where I have 1 sec of stream interval but my job1 takes 5 sec to complete , hence job2 starts at end of 5 sec, so now will it process all data from sec1 to sec 5 in low level non receiver streaming or only for interval sec1-sec2 ? And if it processes data for complete duration sec1-sec5.Is there any option to suppress start of other queued jobs(for interval sec2-3, sec3-4,sec4-5) since there work is already done by job2 ? - I believe all your data from sec2-sec5 will be available in Kafka and when the second batch starts at 5 sec it will consumer it (you can also limit the rate with spark.streaming.kafka.maxRatePerPartition) Read more here https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md On Wed, May 20, 2015 at 12:36 PM, Akhil Das ak...@sigmoidanalytics.com wrote: One receiver basically runs on 1 core, so if your single node is having 4 cores, there are still 3 cores left for the processing (for executors). And yes receiver remains on the same machine unless some failure happens. Thanks Best Regards On Tue, May 19, 2015 at 10:57 PM, Shushant Arora shushantaror...@gmail.com wrote: Thanks Akhil andDibyendu. Does in high level receiver based streaming executors run on receivers itself to have data localisation ? Or its always data is transferred to executor nodes and executor nodes differ in each run of job but receiver node remains same(same machines) throughout life of streaming application unless node failure happens? On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Just to add, there is a Receiver based Kafka consumer which uses Kafka Low Level Consumer API. http://spark-packages.org/package/dibbhatt/kafka-spark-consumer Regards, Dibyendu On Tue, May 19, 2015 at 9:00 PM, Akhil Das ak...@sigmoidanalytics.com wrote: On Tue, May 19, 2015 at 8:10 PM, Shushant Arora shushantaror...@gmail.com wrote: So for Kafka+spark streaming, Receiver based streaming used highlevel api and non receiver based streaming used low level api. 1.In high level receiver based streaming does it registers consumers at each job start(whenever a new job is launched by streaming application say at each second)? - Receiver based streaming will always have the receiver running parallel while your job is running, So by default for every 200ms (spark.streaming.blockInterval) the receiver will generate a block of data which is read from Kafka. 2.No of executors in highlevel receiver based jobs will always equal to no of partitions in topic ? - Not sure from where did you came up with this. For the non stream based one, i think the number of partitions in spark will be equal to the number of kafka partitions for the given topic. 3.Will data from a single topic be consumed by executors in parllel or only one receiver consumes in multiple threads and assign to executors in high level receiver based approach ? - They will consume the data parallel. For the receiver based approach, you can actually specify the number of receiver that you want to spawn for consuming the messages. On Tue, May 19, 2015 at 2:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: spark.streaming.concurrentJobs takes an integer value, not boolean. If you set it as 2 then 2 jobs will run parallel. Default value is 1 and the next job will start once it completes the current one. Actually, in the current implementation of Spark Streaming and under default configuration, only job is active (i.e. under execution) at any point of time. So if one batch's processing takes longer than 10 seconds, then then next batch's jobs will stay queued. This can be changed with an experimental Spark property spark.streaming.concurrentJobs which is by default set to 1. Its not currently documented (maybe I should add it). The reason it is set to 1 is that concurrent jobs can potentially lead to weird sharing of resources and which can make it hard to debug the whether there is sufficient resources in the system to process the ingested data fast enough. With only 1 job running at a time, it is easy to see that if batch processing time batch interval, then the system will be stable. Granted that this may not be the most efficient use of resources under certain conditions. We definitely hope to
LATERAL VIEW explode issue
Hi, When I use LATERAL VIEW explode on the registered temp table in spark shell, it works. But when I use the same in spark-submit (as jar file) it is not working. its giving error - failure: ``union'' expected but identifier VIEW found sql statement i am using is SELECT id,mapKey FROM locations LATERAL VIEW explode(map_keys(jsonStringToMapUdf(countries))) countries AS mapKey I registered jsonStringToMapUdf as my sql function. ThanksKiran9008099770
Re: Reading Binary files in Spark program
I am not doing anything special. *Here is the code :* SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; // Following statements is giving exception final ListTuple2String, Byte tuple2s = seqFiles.toArray(); // Or final ListTuple2String, Byte tuple2s = seqFiles.collect(); *And this is how I have created a sequence file:* http://stuartsierra.com/2008/04/24/a-million-little-files Regards Tapan On Wed, May 20, 2015 at 12:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you can share the complete code and a sample file, may be i can try to reproduce it on my end. Thanks Best Regards On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma tapan.sha...@gmail.com wrote: Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO [sparkDriver-akka.actor.default-dispatcher-2] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0 2015-05-19 15:15:03,747 INFO [main] scheduler.DAGScheduler (Logging.scala:logInfo(59)) - Job 0 failed: collect at JavaSequenceFile.java:44, took 4.421397 s Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text at org.apache.spark.scheduler.DAGScheduler.org $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) at
java program got Stuck at broadcasting
The variable I need to broadcast is just 468 MB. When broadcasting, it just “stop” at here: *15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process : 1 15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at Test1.java:90 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at Test1.java:90) with 4 output partitions (allowLocal=false) 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at Test1.java:90) 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List() 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no missing parents 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called with curMem=988453294, maxMem=2061647216 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 126.2 KB, free 1023.4 MB) 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called with curMem=988582558, maxMem=2061647216 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 76.4 KB, free 1023.3 MB) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB) 15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90) 15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB) ……. 15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB) * And didn’t go forward as I still waiting, basically not stop, but more like stuck. I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. After a few mins pass, the program stopped and showed something like this: 15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, HadoopV26Slave3): org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1 could only be replicated to 0 nodes instead of minReplication (=1). There are 6 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039) at
RE: LATERAL VIEW explode issue
Just a guess but are you using HiveContext in one case vs SqlContext inanother? You dont show a stacktrace but this looks like parser error...Which would make me guess different context or different spark versio on the cluster you are submitting to... Sent on the new Sprint Network from my Samsung Galaxy S®4. div Original message /divdivFrom: kiran mavatoor kirankumar.m...@yahoo.com.INVALID /divdivDate:05/20/2015 5:57 AM (GMT-05:00) /divdivTo: User user@spark.apache.org /divdivSubject: LATERAL VIEW explode issue /divdiv /divHi, When I use LATERAL VIEW explode on the registered temp table in spark shell, it works. But when I use the same in spark-submit (as jar file) it is not working. its giving error - failure: ``union'' expected but identifier VIEW found sql statement i am using is SELECT id,mapKey FROM locations LATERAL VIEW explode(map_keys(jsonStringToMapUdf(countries))) countries AS mapKey I registered jsonStringToMapUdf as my sql function. Thanks Kiran 9008099770
Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?
Hi Davies,Thank you for pointing to spark streaming. I am confused about how to return the result after running a function via a thread.I tried using Queue to add the results to it and print it at the end.But here, I can see the results after all threads are finished.How to get the result of the function once a thread is finished, rather than waiting for all other threads to finish? Thanks Regards, Meethu M On Tuesday, 19 May 2015 2:43 AM, Davies Liu dav...@databricks.com wrote: SparkContext can be used in multiple threads (Spark streaming works with multiple threads), for example: import threading import time def show(x): time.sleep(1) print x def job(): sc.parallelize(range(100)).foreach(show) threading.Thread(target=job).start() On Mon, May 18, 2015 at 12:34 AM, ayan guha guha.a...@gmail.com wrote: Hi So to be clear, do you want to run one operation in multiple threads within a function or you want run multiple jobs using multiple threads? I am wondering why python thread module can't be used? Or you have already gave it a try? On 18 May 2015 16:39, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually need the pyspark code sample which shows how I can call a function from 2 threads and execute it simultaneously. Thanks Regards, Meethu M On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happened to have a look at the spark job server? Someone wrote a python wrapper around it, give it a try. Thanks Best Regards On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a pyspark example? Thanks Regards, Meethu M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Initial job has not accepted any resources
Hi, i am running spark jobs with standalone resource manager and i am gathering several performance metrics from my cluster nodes. I am also gathering disk io metrics from my nodes and because many of my jobs are using the same dataset i am trying to prevent the operating system from caching the dataset in memory in every node so as to gather the correct metrics for every job. Therefore before i submit my jobs to spark i clear my caches with the commands: sync ; echo 3 /proc/sys/vm/drop_caches The problem is that when i do so i see this error at the beginning of the job: WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient memory Ultimately the job runs successfully in most cases, but i feel like this error has a significant effect in the overall execution time of the job which i try to avoid. I am also pretty confident that there is nothing wrong in my configurations, because when i run jobs without clearing my nodes' caches the above error doesn't come up. I would really appreciate i anyone could help me with this error. Thanks. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Initial-job-has-not-accepted-any-resources-tp22955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: save column values of DataFrame to text file
Sorry, bt how does that work? Can u specify the detail about the problem? On 20 May 2015 at 21:32, oubrik [via Apache Spark User List] ml-node+s1001560n2295...@n3.nabble.com wrote: hi, try like thiis DataFrame df = sqlContext.load(com.databricks.spark.csv, options); df.select(year, model).save(newcars.csv, com.databricks.spark.csv); for more information: https://github.com/databricks/spark-csv Regards -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/save-column-values-of-DataFrame-to-text-file-tp22718p22957.html To start a new topic under Apache Spark User List, email ml-node+s1001560n1...@n3.nabble.com To unsubscribe from Apache Spark User List, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=1code=YWxsYW5tY2dyYWR5QGdtYWlsLmNvbXwxfC0xNjcxODAwNTA4 . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- PhD student, Social Media Laboratory http://smedia.ust.hk/, Department of Electronic Computer Engineering http://www.ece.ust.hk/ece.php, The Hong Kong University of Science and Technology http://www.ust.hk/. Website: http://www.allanjie.net -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/save-column-values-of-DataFrame-to-text-file-tp22718p22958.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Reading Binary files in Spark program
Hi Basically, you need to convert it to a serializable format before doing the collect. You can fire up a spark shell and paste this: val sFile = sc.sequenceFile[LongWritable, Text](/home/akhld/sequence/sigmoid) *.map(_._2.toString)* sFile.take(5).foreach(println) Use the attached sequence file generator and generated sequence file that i used for testing. Also note:If you don't do the .map to convert to string, then it will end up with the serializable Exception that you are hitting. [image: Inline image 1] Thanks Best Regards On Wed, May 20, 2015 at 5:48 PM, Tapan Sharma tapan.sha...@gmail.com wrote: I am not doing anything special. *Here is the code :* SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; // Following statements is giving exception final ListTuple2String, Byte tuple2s = seqFiles.toArray(); // Or final ListTuple2String, Byte tuple2s = seqFiles.collect(); *And this is how I have created a sequence file:* http://stuartsierra.com/2008/04/24/a-million-little-files Regards Tapan On Wed, May 20, 2015 at 12:42 PM, Akhil Das ak...@sigmoidanalytics.com wrote: If you can share the complete code and a sample file, may be i can try to reproduce it on my end. Thanks Best Regards On Wed, May 20, 2015 at 7:00 AM, Tapan Sharma tapan.sha...@gmail.com wrote: Problem is still there. Exception is not coming at the time of reading. Also the count of JavaPairRDD is as expected. It is when we are calling collect() or toArray() methods, the exception is coming. Something to do with Text class even though I haven't used it in the program. Regards Tapan On Tue, May 19, 2015 at 6:26 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Try something like: JavaPairRDDIntWritable, Text output = sc.newAPIHadoopFile(inputDir, org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class, IntWritable.class, Text.class, new Job().getConfiguration()); With the type of input format that you require. Thanks Best Regards On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma tapan.sha...@gmail.com wrote: Hi Team, I am new to Spark and learning. I am trying to read image files into spark job. This is how I am doing: Step 1. Created sequence files with FileName as Key and Binary image as value. i.e. Text and BytesWritable. I am able to read these sequence files into Map Reduce programs. Step 2. I understand that Text and BytesWritable are Non Serializable therefore, I read the sequence file in Spark as following: SparkConf sparkConf = new SparkConf().setAppName(JavaSequenceFile); JavaSparkContext ctx = new JavaSparkContext(sparkConf); JavaPairRDDString, Byte seqFiles = ctx.sequenceFile(args[0], String.class, Byte.class) ; final ListTuple2lt;String, Byte tuple2s = seqFiles.collect(); The moment I try to call collect() method to get the keys of sequence file, following exception has been thrown Can any one help me understanding why collect() method is failing? If I use toArray() on seqFiles object then also I am getting same call stack. Regards Tapan java.io.NotSerializableException: org.apache.hadoop.io.Text at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 2015-05-19 15:15:03,705 ERROR [task-result-getter-0] scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not retrying 2015-05-19 15:15:03,731 INFO [task-result-getter-0] scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet 0.0, whose tasks have all completed, from pool 2015-05-19 15:15:03,739 INFO
Re: How to use spark to access HBase with Security enabled
I have similar problem that I cannot pass the HBase configuration file as extra classpath to Spark any more using spark.executor.extraClassPath=MY_HBASE_CONF_DIR in the Spark 1.3. We used to run this in 1.2 without any problem. On Tuesday, May 19, 2015, donhoff_h 165612...@qq.com wrote: Sorry, this ref does not help me. I have set up the configuration in hbase-site.xml. But it seems there are still some extra configurations to be set or APIs to be called to make my spark program be able to pass the authentication with the HBase. Does anybody know how to set authentication to a secured HBase in a spark program which use the API newAPIHadoopRDD to get information from HBase? Many Thanks! -- 原始邮件 -- *发件人:* yuzhihong;yuzhih...@gmail.com javascript:_e(%7B%7D,'cvml','yuzhih...@gmail.com');; *发送时间:* 2015年5月19日(星期二) 晚上9:54 *收件人:* donhoff_h165612...@qq.com javascript:_e(%7B%7D,'cvml','165612...@qq.com');; *抄送:* useruser@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org');; *主题:* Re: How to use spark to access HBase with Security enabled Please take a look at: http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation Cheers On Tue, May 19, 2015 at 5:23 AM, donhoff_h 165612...@qq.com javascript:_e(%7B%7D,'cvml','165612...@qq.com'); wrote: The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark programs. I am sure I have run the kinit command to make it take effect. And I also used the HBase Shell to verify that this user has the right to scan and put the tables in HBase. Now I still have no idea how to solve this problem. Can anybody help me to figure it out? Many Thanks! -- 原始邮件 -- *发件人:* yuzhihong;yuzhih...@gmail.com javascript:_e(%7B%7D,'cvml','yuzhih...@gmail.com');; *发送时间:* 2015年5月19日(星期二) 晚上7:55 *收件人:* donhoff_h165612...@qq.com javascript:_e(%7B%7D,'cvml','165612...@qq.com');; *抄送:* useruser@spark.apache.org javascript:_e(%7B%7D,'cvml','user@spark.apache.org');; *主题:* Re: How to use spark to access HBase with Security enabled Which user did you run your program as ? Have you granted proper permission on hbase side ? You should also check master log to see if there was some clue. Cheers On May 19, 2015, at 2:41 AM, donhoff_h 165612...@qq.com javascript:_e(%7B%7D,'cvml','165612...@qq.com'); wrote: Hi, experts. I ran the HBaseTest program which is an example from the Apache Spark source code to learn how to use spark to access HBase. But I met the following exception: Exception in thread main org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after attempts=36, exceptions: Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 'hbase:meta' at region=hbase:meta,,1.1588230740, hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0 I also checked the RegionServer Log of the host bgdt01.dev.hrb listed in the above exception. I found a few entries like the following one: 2015-05-19 16:59:11,143 DEBUG [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: RpcServer.listener,port=16020: Caught exception while reading:Authentication is required The above entry did not point to my program clearly. But the time is very near. Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the exception was caused by the Kerberos authentication. But I am not sure. Do anybody know if my guess is right? And if I am right, could anybody tell me how to set Kerberos Authentication in a spark program? I don't know how to do it. I already checked the API doc , but did not found any API useful. Many Thanks! By the way, my spark version is 1.3.0. I also paste the code of HBaseTest in the following: ***Source Code** object HBaseTest { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName(HBaseTest) val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() conf.set(TableInputFormat.INPUT_TABLE, args(0)) // Initialize hBase table if necessary val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(args(0))) { val tableDesc = new HTableDescriptor(args(0)) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable], classOf[org.apache.hadoop.hbase.client.Result]) hBaseRDD.count() sc.stop() } } -- Many thanks. Bill
Re: Incrementally add/remove vertices in GraphX
Any updates on GraphX Streaming? There was mention of this about a year ago, but nothing much since. Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Incrementally-add-remove-vertices-in-GraphX-tp2227p22963.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Wish for 1.4: upper bound on # tasks in Mesos
To put this on the devs' radar, I suggest creating a JIRA for it (and checking first if one already exists). issues.apache.org/jira/ Nick On Tue, May 19, 2015 at 1:34 PM Matei Zaharia matei.zaha...@gmail.com wrote: Yeah, this definitely seems useful there. There might also be some ways to cap the application in Mesos, but I'm not sure. Matei On May 19, 2015, at 1:11 PM, Thomas Dudziak tom...@gmail.com wrote: I'm using fine-grained for a multi-tenant environment which is why I would welcome the limit of tasks per job :) cheers, Tom On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia matei.zaha...@gmail.com wrote: Hey Tom, Are you using the fine-grained or coarse-grained scheduler? For the coarse-grained scheduler, there is a spark.cores.max config setting that will limit the total # of cores it grabs. This was there in earlier versions too. Matei On May 19, 2015, at 12:39 PM, Thomas Dudziak tom...@gmail.com wrote: I read the other day that there will be a fair number of improvements in 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a configurable limit for the number of tasks for jobs run on Mesos ? This would be a very simple yet effective way to prevent a job dominating the cluster. cheers, Tom
Re: Is this a good use case for Spark?
Spark is a great framework to do things in parallel with multiple machines, will be really helpful for your case. Once you can wrap your entire pipeline into a single Python function: def process_document(path, text): # you can call other tools or services here return xxx then you can process all the documents in parallel as easy as: sc.wholeTextFiles(path/to/documents).map(lambda (k, v): process_document(k, v)).saveAsXXX(path/in/s3) On Wed, May 20, 2015 at 12:38 AM, jakeheller j...@casetext.com wrote: Hi all, I'm new to Spark -- so new that we're deciding whether to use it in the first place, and I was hoping someone here could help me figure that out. We're doing a lot of processing of legal documents -- in particular, the entire corpus of American law. It's about 10m documents, many of which are quite large as far as text goes (100s of pages). We'd like to (a) transform these documents from the various (often borked) formats they come to us in into a standard XML format, (b) when it is in a standard format, extract information from them (e.g., which judicial cases cite each other?) and annotate the documents with the information extracted, and then (c) deliver the end result to a repository (like s3) where it can be accessed by the user-facing application. Of course, we'd also like to do all of this quickly -- optimally, running the entire database through the whole pipeline in a few hours. We currently use a mix of Python and Java scripts (including XSLT, and NLP/unstructured data tools like UIMA and Stanford's CoreNLP) in various places along the pipeline we built for ourselves to handle these tasks. The current pipeline infrastructure was built a while back -- it's basically a number of HTTP servers that each have a single task and pass the document along from server to server as it goes through the processing pipeline. It's great although it's having trouble scaling, and there are some reliability issues. It's also a headache to handle all the infrastructure. For what it's worth, metadata about the documents resides in SQL, and the actual text of the documents lives in s3. It seems like Spark would be ideal for this, but after some searching I wasn't able to find too many examples of people using it for document-processing tasks (like transforming documents from one XML format into another) and I'm not clear if I can chain those sorts of tasks and NLP tasks, especially if some happen in Python and others in Java. Finally, I don't know if the size of the data (i.e., we'll likely want to run operations on whole documents, rather than just lines) imposes issues/constraints. Thanks all! Jake -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-this-a-good-use-case-for-Spark-tp22954.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: java program Get Stuck at broadcasting
This is more like an issue with your HDFS setup, can you check in the datanode logs? Also try putting a new file in HDFS and see if that works. Thanks Best Regards On Wed, May 20, 2015 at 11:47 AM, allanjie allanmcgr...@gmail.com wrote: Hi All, The variable I need to broadcast is just 468 MB. When broadcasting, it just “stop” at here: * 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id 15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process : 1 15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at Test1.java:90 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at Test1.java:90) with 4 output partitions (allowLocal=false) 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage 0(saveAsTextFile at Test1.java:90) 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage: List() 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List() 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no missing parents 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called with curMem=988453294, maxMem=2061647216 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 126.2 KB, free 1023.4 MB) 15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called with curMem=988582558, maxMem=2061647216 15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 76.4 KB, free 1023.3 MB) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB) 15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block broadcast_2_piece0 15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:839 15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90) 15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 4 tasks 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage 0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB) 15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB) ……. 15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB) * And didn’t go forward as I still waiting, basically not stop, but more like stuck. I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. After a few mins pass, the program stopped and showed something like this: 15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, HadoopV26Slave3): org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1 could only be replicated to 0 nodes instead of minReplication (=1). There are 6 datanode(s) running and no node(s) are excluded in this operation. at org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482) at
Re: How to run multiple jobs in one sparkcontext from separate threads in pyspark?
I think this is a general multiple-threading question, Queue is the right direction to go. Have you try something like this? results = Queue.Queue() def run_job(f, args): r = f(*args) results.put(r) # start multiple threads to run jobs threading.Thread(target=run_job, args=(f, args,)).start() while True: r = results.get() print r On Wed, May 20, 2015 at 5:56 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Davies, Thank you for pointing to spark streaming. I am confused about how to return the result after running a function via a thread. I tried using Queue to add the results to it and print it at the end.But here, I can see the results after all threads are finished. How to get the result of the function once a thread is finished, rather than waiting for all other threads to finish? Thanks Regards, Meethu M On Tuesday, 19 May 2015 2:43 AM, Davies Liu dav...@databricks.com wrote: SparkContext can be used in multiple threads (Spark streaming works with multiple threads), for example: import threading import time def show(x): time.sleep(1) print x def job(): sc.parallelize(range(100)).foreach(show) threading.Thread(target=job).start() On Mon, May 18, 2015 at 12:34 AM, ayan guha guha.a...@gmail.com wrote: Hi So to be clear, do you want to run one operation in multiple threads within a function or you want run multiple jobs using multiple threads? I am wondering why python thread module can't be used? Or you have already gave it a try? On 18 May 2015 16:39, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi Akhil, The python wrapper for Spark Job Server did not help me. I actually need the pyspark code sample which shows how I can call a function from 2 threads and execute it simultaneously. Thanks Regards, Meethu M On Thursday, 14 May 2015 12:38 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Did you happened to have a look at the spark job server? Someone wrote a python wrapper around it, give it a try. Thanks Best Regards On Thu, May 14, 2015 at 11:10 AM, MEETHU MATHEW meethu2...@yahoo.co.in wrote: Hi all, Quote Inside a given Spark application (SparkContext instance), multiple parallel jobs can run simultaneously if they were submitted from separate threads. How to run multiple jobs in one SPARKCONTEXT using separate threads in pyspark? I found some examples in scala and java, but couldn't find python code. Can anyone help me with a pyspark example? Thanks Regards, Meethu M - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set HBaseConfiguration in Spark
Cloudera blog has some details. Please check if this is helpful to you. http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ Thanks. On Wed, May 20, 2015 at 4:21 AM, donhoff_h 165612...@qq.com wrote: Hi, all I wrote a program to get HBaseConfiguration object in Spark. But after I printed the content of this hbase-conf object, I found they were wrong. For example, the property hbase.zookeeper.quorum should be bgdt01.dev.hrb,bgdt02.dev.hrb,bgdt03.hrb. But the printed value is localhost. Could anybody tell me how to set up the HBase Configuration in Spark? No matter it should be set in a configuration file or be set by a Spark API. Many Thanks! The code of my program is listed below: object TestHBaseConf { def main(args: Array[String]) { val conf = new SparkConf() val sc = new SparkContext(conf) val hbConf = HBaseConfiguration.create() hbConf.addResource(file:///etc/hbase/conf/hbase-site.xml) val it = hbConf.iterator() while(it.hasNext) { val e = it.next() println(Key=+ e.getKey + Value=+e.getValue) } val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8,9)) val result = rdd.sum() println(result=+result) sc.stop() } }
Re: LATERAL VIEW explode issue
Hi Yana, I was using sqlContext in the program by creating new SqlContext(sc). This was created the problem when i submit the job using spark-submit. Where as, when I run the same program in spark-shell, the default context is hive context (it seems ) and every thing seems to be fine. This created confusion. As solution, i called new HiveContext(sc) instead of SqlContext. cheerskiran. On Wednesday, May 20, 2015 6:38 PM, yana yana.kadiy...@gmail.com wrote: Just a guess but are you using HiveContext in one case vs SqlContext inanother? You dont show a stacktrace but this looks like parser error...Which would make me guess different context or different spark versio on the cluster you are submitting to... Sent on the new Sprint Network from my Samsung Galaxy S®4. Original message From: kiran mavatoor Date:05/20/2015 5:57 AM (GMT-05:00) To: User Subject: LATERAL VIEW explode issue Hi, When I use LATERAL VIEW explode on the registered temp table in spark shell, it works. But when I use the same in spark-submit (as jar file) it is not working. its giving error - failure: ``union'' expected but identifier VIEW found sql statement i am using is SELECT id,mapKey FROM locations LATERAL VIEW explode(map_keys(jsonStringToMapUdf(countries))) countries AS mapKey I registered jsonStringToMapUdf as my sql function. ThanksKiran9008099770
Re: Re: spark 1.3.1 jars in repo1.maven.org
I don't think any of those problems are related to Hadoop. Have you looked at userClassPathFirst settings? On Wed, May 20, 2015 at 6:46 PM, Edward Sargisson ejsa...@gmail.com wrote: Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward
FP Growth saveAsTextFile
I am having trouble with saving an FP-Growth model as a text file. I can print out the results, but when I try to save the model I get a NullPointerException. model.freqItemsets.saveAsTextFile(c://fpGrowth/model) Thanks, Eric
Re: Spark 1.3.1 - SQL Issues
The docs had been updated. You should convert the DataFrame to RDD by `df.rdd` On Mon, Apr 20, 2015 at 5:23 AM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo
I'm running Spark v1.3.1 and when I run the following against my dataset: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3) The job will fail with the following message: Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 553, in trainRegressor loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 438, in _train loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 113, in callJavaFunc return _java2py(sc, func(*args)) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95 py4j.protocol.Py4JJavaError: An error occurred while calling o69.trainGradientBoostedTreesModel. : java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) = max categories in categorical features (= 1895) at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138) at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60) at org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150) at org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63) at org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96) at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595) So, it's complaining about the maxBins, if I provide maxBins=1900 and re-run it: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3, maxBins=1900) Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catF eatures, maxDepth=6, numIterations=3, maxBins=1900) TypeError: trainRegressor() got an unexpected keyword argument 'maxBins' It now says it knows nothing of maxBins. If I run the same command against DecisionTree or RandomForest (with maxBins=1900) it works just fine. Seems like a bug in GradientBoostedTrees. Suggestions? -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ 800-733-2143
Re: Spark Streaming - Design considerations/Knobs
Correcting the ones that are incorrect or incomplete. BUT this is good list for things to remember about Spark Streaming. On Wed, May 20, 2015 at 3:40 AM, Hemant Bhanawat hemant9...@gmail.com wrote: Hi, I have compiled a list (from online sources) of knobs/design considerations that need to be taken care of by applications running on spark streaming. Is my understanding correct? Any other important design consideration that I should take care of? - A DStream is associated with a single receiver. For attaining read parallelism multiple receivers i.e. multiple DStreams need to be created. - A receiver is run within an executor. It occupies one core. Ensure that there are enough cores for processing after receiver slots are booked i.e. spark.cores.max should take the receiver slots into account. - The receivers are allocated to executors in a round robin fashion. - When data is received from a stream source, receiver creates blocks of data. A new block of data is generated every blockInterval milliseconds. N blocks of data are created during the batchInterval where N = batchInterval/blockInterval. - These blocks are distributed by the BlockManager of the current executor to the block managers of other executors. After that, the Network Input Tracker running on the driver is informed about the block locations for further processing. - A RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally. The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in (as you observed next). - Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally. - Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling dstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. - An RDD's processing is scheduled by driver's jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued. - If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. - To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted. To further clarify, the jobs depend on the number of output operations (print, foreachRDD, saveAsXFiles) and the number of RDD actions in those output operations. dstream1.union(dstream2).foreachRDD { rdd = rdd.count() }// one Spark job per batch dstream1.union(dstream2).foreachRDD { rdd = { rdd.count() ; rdd.count() } }// TWO Spark jobs per batch dstream1.foreachRDD { rdd = rdd.count } ; dstream2.foreachRDD { rdd = rdd.count } // TWO Spark jobs per batch - - If the batch processing time is more than batchinterval then obviously the receiver's memory will start filling up and will end up in throwing exceptions (most probably BlockNotFoundException). Currently there is no way to pause the receiver. You can limit the rate of receiver using SparkConf config spark.streaming.receiver.maxRate - - For being fully fault tolerant, spark streaming needs to enable checkpointing. Checkpointing increases the batch processing time. Incomplete. There are two types of checkpointing - data and metadata. Only data checkpointing, needed by only some operations, increase batch processing time. Read - http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing Furthemore, with checkpoint you can recover computation, but you may loose some data (that was received but not processed before driver failed) for some sources. Enabling write ahead logs and reliable source + receiver, allow zero data loss. Read - WAL in http://spark.apache.org/docs/latest/streaming-programming-guide.html#fault-tolerance-semantics - The frequency of metadata checkpoint cleaning can be controlled using spark.cleaner.ttl. But, data checkpoint cleaning happens automatically when the RDDs in the checkpoint are no more required. Incorrect. metadata checkpointing or
Re: Multi user setup and saving a DataFrame / RDD to a network exported file system
Could you file a JIRA for this? The executor should run under the user who submit a job, I think. On Wed, May 20, 2015 at 2:40 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl wrote: Thanks for a suggestion. I have tried playing with it, sc.sparkUser() gives me expected user name, but it doesnt solve the problem. From a quick search through the spark code it seems to me, that this setting is effective only for yarn and mesos. I think the workaround for the problem could be using --deploy-mode cluster (not 100% convenient, since disallows any interactive work), but this is not supported for python based programs. Cheers, Tomasz W dniu 20.05.2015 o 10:57, Iulian Dragoș pisze: You could try setting `SPARK_USER` to the user under which your workers are running. I couldn't find many references to this variable, but at least Yarn and Mesos take it into account when spawning executors. Chances are that standalone mode also does it. iulian On Wed, May 20, 2015 at 9:29 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote: Hi, thanks for answer. The rights are drwxr-xr-x 3 tfruboes all 5632 05-19 15 tel:5632%2005-19%2015:40 test19EE/ I have tried setting the rights to 777 for this directory prior to execution. This does not get propagated down the chain, ie the directory created as a result of the save call (namesAndAges.parquet2 in the path in the dump [1] below) is created with the drwxr-xr-x rights (owned by the user submitting the job, ie tfruboes). The temp directories created inside namesAndAges.parquet2/_temporary/0/ (e.g. task_201505200920_0009_r_01) are owned by root, again with drwxr-xr-x access rights Cheers, Tomasz W dniu 19.05.2015 o 23:56, Davies Liu pisze: It surprises me, could you list the owner information of /mnt/lustre/bigdata/med_home/tmp/test19EE/ ? On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes tomasz.frub...@fuw.edu.pl mailto:tomasz.frub...@fuw.edu.pl wrote: Dear Experts, we have a spark cluster (standalone mode) in which master and workers are started from root account. Everything runs correctly to the point when we try doing operations such as dataFrame.select(name, age).save(ofile, parquet) or rdd.saveAsPickleFile(ofile) , where ofile is path on a network exported filesystem (visible on all nodes, in our case this is lustre, I guess on nfs effect would be similar). Unsurprisingly temp files created on workers are owned by root, which then leads to a crash (see [1] below). Is there a solution/workaround for this (e.g. controlling file creation mode of the temporary files)? Cheers, Tomasz ps I've tried to google this problem, couple of similar reports, but no clear answer/solution found ps2 For completeness - running master/workers as a regular user solves the problem only for the given user. For other users submitting to this master the result is given in [2] below [0] Cluster details: Master/workers: centos 6.5 Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build) [1] ## File /mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o27.save. : java.io.IOException: Failed to rename DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; isDirectory=false; length=534; replication=1; blocksize=33554432; modification_time=1432042832000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false} to file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310) at
Re: PySpark Logs location
Hi Ruslan. Could you add more details please. Where do I get applicationId? In case I have a lot of log files would it make sense to view it from single point. How actually I can configure / manage log location of PySpark? Thanks Oleg. On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
Re: FP Growth saveAsTextFile
Could you post the stack trace? If you are using Spark 1.3 or 1.4, it would be easier to save freq itemsets as a Parquet file. -Xiangrui On Wed, May 20, 2015 at 12:16 PM, Eric Tanner eric.tan...@justenough.com wrote: I am having trouble with saving an FP-Growth model as a text file. I can print out the results, but when I try to save the model I get a NullPointerException. model.freqItemsets.saveAsTextFile(c://fpGrowth/model) Thanks, Eric - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Read multiple files from S3
Hi, I am trying to get a collection of files according to LastModifiedDate from S3 List String FileNames = new ArrayListString(); ListObjectsRequest listObjectsRequest = new ListObjectsRequest() .withBucketName(s3_bucket) .withPrefix(logs_dir); ObjectListing objectListing; do { objectListing = s3Client.listObjects(listObjectsRequest); for (S3ObjectSummary objectSummary : objectListing.getObjectSummaries()) { if ((objectSummary.getLastModified().compareTo(dayBefore) 0) (objectSummary.getLastModified().compareTo(dayAfter) 1) objectSummary.getKey().contains(.log)) FileNames.add(objectSummary.getKey()); } listObjectsRequest.setMarker(objectListing.getNextMarker()); } while (objectListing.isTruncated()); I would like to process these files using Spark I understand that textFile reads a single text file. Is there any way to read all these files that are part of the List? Thanks for your help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Read-multiple-files-from-S3-tp22965.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: User Defined Type (UDT)
Probably in 1.5. I made a JIRA for it: https://issues.apache.org/jira/browse/SPARK-7768. You can watch that JIRA (and vote). -Xiangrui On Wed, May 20, 2015 at 11:03 AM, Justin Uang justin.u...@gmail.com wrote: Xiangrui, is there a timeline for when UDTs will become a public API? I'm currently using them to support java 8's ZonedDateTime. On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng men...@gmail.com wrote: (Note that UDT is not a public API yet.) On Thu, May 7, 2015 at 7:11 AM, wjur wojtek.jurc...@gmail.com wrote: Hi all! I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for a project I'm working on. I've created a case class Person(name: String) and now I'm trying to make Spark to be able serialize and deserialize the defined type. I made a couple of attempts but none of them did not work in 100% (there were issues either in serialization or deserialization). This is my class and the corresponding UDT. @SQLUserDefinedType(udt = classOf[PersonUDT]) case class Person(name: String) class PersonUDT extends UserDefinedType[Person] { override def sqlType: DataType = StructType(Seq(StructField(name, StringType))) override def serialize(obj: Any): Seq[Any] = { This should return a Row instance instead of Seq[Any], because the sqlType is a struct type. obj match { case c: Person = Seq(c.name) } } override def userClass: Class[Person] = classOf[Person] override def deserialize(datum: Any): Person = { datum match { case values: Seq[_] = assert(values.length == 1) Person(values.head.asInstanceOf[String]) case values: util.ArrayList[_] = Person(values.get(0).asInstanceOf[String]) } } // In some other attempt I was creating RDD of Seq with manually serialized data and // I had to override equals because two DFs with the same type weren't actually equal // StructField(person,...types.PersonUDT@a096ac3) // StructField(person,...types.PersonUDT@613fd937) def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT] override def equals(other: Any): Boolean = other match { case that: PersonUDT = true case _ = false } override def hashCode(): Int = 1 } This is how I create RDD of Person and then try to create a DataFrame val rdd = sparkContext.parallelize((1 to 100).map(i = Person(i.toString))) val sparkDataFrame = sqlContext.createDataFrame(rdd) The second line throws an exception: java.lang.ClassCastException: types.PersonUDT cannot be cast to org.apache.spark.sql.types.StructType at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316) I looked into the code in SQLContext.scala and it seems that the code requires UDT to be extending StructType but in fact it extends UserDefinedType which extends directly DataType. I'm not sure whether it is a bug or I just don't know how to use UDTs. Do you have any suggestions how to solve this? I based my UDT on ExamplePointUDT but it seems to be incorrect. Is there a working example for UDT? Thank you for the reply in advance! wjur -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: PySpark Logs location
Oleg, You can see applicationId in your Spark History Server. Go to http://historyserver:18088/ Also check https://spark.apache.org/docs/1.1.0/running-on-yarn.html#debugging-your-application It should be no different with PySpark. -- Ruslan Dautkhanov On Wed, May 20, 2015 at 2:12 PM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi Ruslan. Could you add more details please. Where do I get applicationId? In case I have a lot of log files would it make sense to view it from single point. How actually I can configure / manage log location of PySpark? Thanks Oleg. On Wed, May 20, 2015 at 10:24 PM, Ruslan Dautkhanov dautkha...@gmail.com wrote: You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
Storing data in MySQL from spark hive tables
Hi , I am trying to setup the hive metastore and mysql DB connection. I have a spark cluster and I ran some programs and I have data stored in some hive tables. Now I want to store this data into Mysql so that it is available for further processing. I setup the hive-site.xml file. ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration property namehive.semantic.analyzer.factory.impl/name valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value /property property namehive.metastore.sasl.enabled/name valuefalse/value /property property namehive.server2.authentication/name valueNONE/value /property property namehive.server2.enable.doAs/name valuetrue/value /property property namehive.warehouse.subdir.inherit.perms/name valuetrue/value /property property namehive.metastore.schema.verification/name valuefalse/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://*ip address* :3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionMySQL JDBC driver class/description /property property namejavax.jdo.option.ConnectionUserName/name valueroot/value /property property namejavax.jdo.option.ConnectionPassword/name value/value /property property namehive.metastore.warehouse.dir/name value/user/${user.name}/hive-warehouse/value descriptionlocation of default database for the warehouse/description /property /configuration -- My mysql server is on a separate server than where my spark server is . If I use mySQLWorkbench , I use a SSH connection with a certificate file to connect . How do I specify all that information from spark to the DB ? I want to store the data generated by my spark program into mysql. Thanks _R
Re: User Defined Type (UDT)
Xiangrui, is there a timeline for when UDTs will become a public API? I'm currently using them to support java 8's ZonedDateTime. On Tue, May 19, 2015 at 3:14 PM Xiangrui Meng men...@gmail.com wrote: (Note that UDT is not a public API yet.) On Thu, May 7, 2015 at 7:11 AM, wjur wojtek.jurc...@gmail.com wrote: Hi all! I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for a project I'm working on. I've created a case class Person(name: String) and now I'm trying to make Spark to be able serialize and deserialize the defined type. I made a couple of attempts but none of them did not work in 100% (there were issues either in serialization or deserialization). This is my class and the corresponding UDT. @SQLUserDefinedType(udt = classOf[PersonUDT]) case class Person(name: String) class PersonUDT extends UserDefinedType[Person] { override def sqlType: DataType = StructType(Seq(StructField(name, StringType))) override def serialize(obj: Any): Seq[Any] = { This should return a Row instance instead of Seq[Any], because the sqlType is a struct type. obj match { case c: Person = Seq(c.name) } } override def userClass: Class[Person] = classOf[Person] override def deserialize(datum: Any): Person = { datum match { case values: Seq[_] = assert(values.length == 1) Person(values.head.asInstanceOf[String]) case values: util.ArrayList[_] = Person(values.get(0).asInstanceOf[String]) } } // In some other attempt I was creating RDD of Seq with manually serialized data and // I had to override equals because two DFs with the same type weren't actually equal // StructField(person,...types.PersonUDT@a096ac3) // StructField(person,...types.PersonUDT@613fd937) def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT] override def equals(other: Any): Boolean = other match { case that: PersonUDT = true case _ = false } override def hashCode(): Int = 1 } This is how I create RDD of Person and then try to create a DataFrame val rdd = sparkContext.parallelize((1 to 100).map(i = Person(i.toString))) val sparkDataFrame = sqlContext.createDataFrame(rdd) The second line throws an exception: java.lang.ClassCastException: types.PersonUDT cannot be cast to org.apache.spark.sql.types.StructType at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316) I looked into the code in SQLContext.scala and it seems that the code requires UDT to be extending StructType but in fact it extends UserDefinedType which extends directly DataType. I'm not sure whether it is a bug or I just don't know how to use UDTs. Do you have any suggestions how to solve this? I based my UDT on ExamplePointUDT but it seems to be incorrect. Is there a working example for UDT? Thank you for the reply in advance! wjur -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark users
Yes, this is the user group. Feel free to ask your questions in this list. Thanks Best Regards On Wed, May 20, 2015 at 5:58 AM, Ricardo Goncalves da Silva ricardog.si...@telefonica.com wrote: Hi I'm learning spark focused on data and machine learning. Migrating from SAS. There is a group for it? My questions are basic for now and I having very few answers. Tal Rick. Enviado do meu smartphone Samsung Galaxy. -- Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede contener información privilegiada o confidencial y es para uso exclusivo de la persona o entidad de destino. Si no es usted. el destinatario indicado, queda notificado de que la lectura, utilización, divulgación y/o copia sin autorización puede estar prohibida en virtud de la legislación vigente. Si ha recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente por esta misma vía y proceda a su destrucción. The information contained in this transmission is privileged and confidential information intended only for the use of the individual or entity named above. If the reader of this message is not the intended recipient, you are hereby notified that any dissemination, distribution or copying of this communication is strictly prohibited. If you have received this transmission in error, do not read it. Please immediately reply to the sender that you have received this communication in error and then delete it. Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e proceda a sua destruição
Fwd: Re: spark 1.3.1 jars in repo1.maven.org
Hi Sean and Ted, Thanks for your replies. I don't have our current problems nicely written up as good questions yet. I'm still sorting out classpath issues, etc. In case it is of help, I'm seeing: * Exception in thread Spark Context Cleaner java.lang.NoClassDefFoundError: 0 at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:149) * We've been having clashing dependencies between a colleague and I because of the aforementioned classpath issue * The clashing dependencies are also causing issues with what jetty libraries are available in the classloader from Spark and don't clash with existing libraries we have. More anon, Cheers, Edward Original Message Subject: Re: spark 1.3.1 jars in repo1.maven.org Date: 2015-05-20 00:38 From: Sean Owen so...@cloudera.com To: Edward Sargisson esa...@pobox.com Cc: user user@spark.apache.org Yes, the published artifacts can only refer to one version of anything (OK, modulo publishing a large number of variants under classifiers). You aren't intended to rely on Spark's transitive dependencies for anything. Compiling against the Spark API has no relation to what version of Hadoop it binds against because it's not part of any API. You mark the Spark dependency even as provided in your build and get all the Spark/Hadoop bindings at runtime from our cluster. What problem are you experiencing? On Wed, May 20, 2015 at 2:17 AM, Edward Sargisson esa...@pobox.com wrote: Hi, I'd like to confirm an observation I've just made. Specifically that spark is only available in repo1.maven.org for one Hadoop variant. The Spark source can be compiled against a number of different Hadoops using profiles. Yay. However, the spark jars in repo1.maven.org appear to be compiled against one specific Hadoop and no other differentiation is made. (I can see a difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in the version I compiled locally). The implication here is that if you have a pom file asking for spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2 version. Maven assumes that non-snapshot artifacts never change so trying to load an Hadoop 1 version will end in tears. This then means that if you compile code against spark-core then there will probably be classpath NoClassDefFound issues unless the Hadoop 2 version is exactly the one you want. Have I gotten this correct? It happens that our little app is using a Spark context directly from a Jetty webapp and the classpath differences were/are causing some confusion. We are currently installing a Hadoop 1 spark master and worker. Thanks a lot! Edward
Re: --jars works in yarn-client but not yarn-cluster mode, why?
Hello, Sorry for the delay. The issue you're running into is because most HBase classes are in the system class path, while jars added with --jars are only visible to the application class loader created by Spark. So classes in the system class path cannot see them. You can work around this by setting --driver-classpath /opt/.../htrace-core-3.1.0-incubating.jar and --conf spark.executor.extraClassPath= /opt/.../htrace-core-3.1.0-incubating.jar in your spark-submit command line. (You can also add those configs to your spark-defaults.conf to avoid having to type them all the time; and don't forget to include any other jars that might be needed.) On Mon, May 18, 2015 at 11:14 PM, Fengyun RAO raofeng...@gmail.com wrote: Thanks, Marcelo! Below is the full log, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/05/19 14:08:58 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/05/19 14:08:59 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1432015548391_0003_01 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nobody, raofengyun); users with modify permissions: Set(nobody, raofengyun) 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/05/19 14:09:00 INFO spark.SparkContext: Running Spark version 1.3.0 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nobody, raofengyun); users with modify permissions: Set(nobody, raofengyun) 15/05/19 14:09:01 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/19 14:09:01 INFO Remoting: Starting remoting 15/05/19 14:09:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@gs-server-v-127:7191] 15/05/19 14:09:01 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@gs-server-v-127:7191] 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'sparkDriver' on port 7191. 15/05/19 14:09:01 INFO spark.SparkEnv: Registering MapOutputTracker 15/05/19 14:09:01 INFO spark.SparkEnv: Registering BlockManagerMaster 15/05/19 14:09:01 INFO storage.DiskBlockManager: Created local directory at /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/blockmgr-3250910b-693e-46ff-b057-26d552fd8abd 15/05/19 14:09:01 INFO storage.MemoryStore: MemoryStore started with capacity 259.7 MB 15/05/19 14:09:01 INFO spark.HttpFileServer: HTTP File server directory is /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/httpd-5bc614bc-d8b1-473d-a807-4d9252eb679d 15/05/19 14:09:01 INFO spark.HttpServer: Starting HTTP Server 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/19 14:09:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:9349 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'HTTP file server' on port 9349. 15/05/19 14:09:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/05/19 14:09:01 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/19 14:09:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:63023 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'SparkUI' on port 63023. 15/05/19 14:09:01 INFO ui.SparkUI: Started SparkUI at http://gs-server-v-127:63023 15/05/19 14:09:02 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/05/19 14:09:02 INFO netty.NettyBlockTransferService: Server created on 33526 15/05/19 14:09:02 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/05/19 14:09:02 INFO storage.BlockManagerMasterActor: Registering block manager gs-server-v-127:33526 with 259.7 MB RAM,
Re: PySpark Logs location
You could use yarn logs -applicationId application_1383601692319_0008 -- Ruslan Dautkhanov On Wed, May 20, 2015 at 5:37 AM, Oleg Ruchovets oruchov...@gmail.com wrote: Hi , I am executing PySpark job on yarn ( hortonworks distribution). Could someone pointing me where is the log locations? Thanks Oleg.
IPv6 support
Hello, I have to work with IPv6 only servers and when I installed the 1.3.1 hadoop 2.6 build, I couldn¹t get the example to run due to IPv6 issues (errors below). I tried to add the -Djava.net.preferIPv6Addresses=true setting but it still doesn¹t work. A search on Spark¹s support for IPv6 is inconclusive. Can someone help clarify the current status for IPv6? Thanks Kevin ‹‹ errors ‹ 5/05/20 10:17:30 INFO Executor: Fetching http://2401:db00:2030:709b:face:0:9:0:51453/jars/spark-examples-1.3.1-hadoo p2.6.0.jar with timestamp 1432142250197 15/05/20 10:17:30 INFO Executor: Fetching http://2401:db00:2030:709b:face:0:9:0:51453/jars/spark-examples-1.3.1-hadoo p2.6.0.jar with timestamp 1432142250197 15/05/20 10:17:30 ERROR Executor: Exception in task 5.0 in stage 0.0 (TID 5) java.net.MalformedURLException: For input string: db00:2030:709b:face:0:9:0:51453 at java.net.URL.init(URL.java:620) at java.net.URL.init(URL.java:483) at java.net.URL.init(URL.java:432) at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:603) at org.apache.spark.util.Utils$.fetchFile(Utils.scala:431) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Execu tor$$updateDependencies$5.apply(Executor.scala:374) at org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Execu tor$$updateDependencies$5.apply(Executor.scala:366) at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Traver sableLike.scala:772) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:7 71) at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$upda teDependencies(Executor.scala:366) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:184) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1 142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.NumberFormatException: For input string: db00:2030:709b:face:0:9:0:51453 at java.lang.NumberFormatException.forInputString(NumberFormatException.java:6 5) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at java.net.URLStreamHandler.parseURL(URLStreamHandler.java:216) at java.net.URL.init(URL.java:615) ... 18 more - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo
Could you please open a JIRA for it? The maxBins input is missing for the Python Api. Is it possible if you can use the current master? In the current master, you should be able to use trees with the Pipeline Api and DataFrames. Best, Burak On Wed, May 20, 2015 at 2:44 PM, Don Drake dondr...@gmail.com wrote: I'm running Spark v1.3.1 and when I run the following against my dataset: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3) The job will fail with the following message: Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 553, in trainRegressor loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 438, in _train loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 113, in callJavaFunc return _java2py(sc, func(*args)) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95 py4j.protocol.Py4JJavaError: An error occurred while calling o69.trainGradientBoostedTreesModel. : java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) = max categories in categorical features (= 1895) at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138) at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60) at org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150) at org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63) at org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96) at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595) So, it's complaining about the maxBins, if I provide maxBins=1900 and re-run it: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3, maxBins=1900) Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catF eatures, maxDepth=6, numIterations=3, maxBins=1900) TypeError: trainRegressor() got an unexpected keyword argument 'maxBins' It now says it knows nothing of maxBins. If I run the same command against DecisionTree or RandomForest (with maxBins=1900) it works just fine. Seems like a bug in GradientBoostedTrees. Suggestions? -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ 800-733-2143
Re: Storing data in MySQL from spark hive tables
I'm afraid you misunderstand the purpose of hive-site.xml. It configures access to the Hive metastore. You can read more here: http://www.hadoopmaterial.com/2013/11/metastore.html. So the MySQL DB in hive-site.xml would be used to store hive-specific data such as schema info, partition info, etc. Now, for what you want to do, you can search the user list -- I know there have been posts about Postgres but you can do the same with MySQL. The idea is to create an object holding a connection pool (so each of your executors would have its own instance), or alternately, to open a connection within mapPartitions (so you don't end up with a ton of connections). But the write to a DB is largely a manual process -- open a connection, create a statement, sync the data. If your data is small enough you probably could just collect on the driver and write...though that would certainly be slower than writing in parallel from each executor. On Wed, May 20, 2015 at 5:48 PM, roni roni.epi...@gmail.com wrote: Hi , I am trying to setup the hive metastore and mysql DB connection. I have a spark cluster and I ran some programs and I have data stored in some hive tables. Now I want to store this data into Mysql so that it is available for further processing. I setup the hive-site.xml file. ?xml version=1.0? ?xml-stylesheet type=text/xsl href=configuration.xsl? configuration property namehive.semantic.analyzer.factory.impl/name valueorg.apache.hcatalog.cli.HCatSemanticAnalyzerFactory/value /property property namehive.metastore.sasl.enabled/name valuefalse/value /property property namehive.server2.authentication/name valueNONE/value /property property namehive.server2.enable.doAs/name valuetrue/value /property property namehive.warehouse.subdir.inherit.perms/name valuetrue/value /property property namehive.metastore.schema.verification/name valuefalse/value /property property namejavax.jdo.option.ConnectionURL/name valuejdbc:mysql://*ip address* :3306/metastore_db?createDatabaseIfNotExist=true/value descriptionmetadata is stored in a MySQL server/description /property property namejavax.jdo.option.ConnectionDriverName/name valuecom.mysql.jdbc.Driver/value descriptionMySQL JDBC driver class/description /property property namejavax.jdo.option.ConnectionUserName/name valueroot/value /property property namejavax.jdo.option.ConnectionPassword/name value/value /property property namehive.metastore.warehouse.dir/name value/user/${user.name}/hive-warehouse/value descriptionlocation of default database for the warehouse/description /property /configuration -- My mysql server is on a separate server than where my spark server is . If I use mySQLWorkbench , I use a SSH connection with a certificate file to connect . How do I specify all that information from spark to the DB ? I want to store the data generated by my spark program into mysql. Thanks _R
Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)
Hi Xin, If you take a look at the model you trained, the intercept from Spark is significantly smaller than StatsModel, and the intercept represents a prior on categories in LOR which causes the low accuracy in Spark implementation. In LogisticRegressionWithLBFGS, the intercept is regularized due to the implementation of Updater, and the intercept should not be regularized. In the new pipleline APIs, a LOR with elasticNet is implemented, and the intercept is properly handled. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala As you can see the tests, https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala the result is exactly the same as R now. BTW, in both version, the feature scalings are done before training, and we train the model in scaled space but transform the model weights back to original space. The only difference is in the mllib version, LogisticRegressionWithLBFGS regularizes the intercept while in the ml version, the intercept is excluded from regularization. As a result, if lambda is zero, the model should be the same. On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote: Hi, I have tried a few models in Mllib to train a LogisticRegression model. However, I consistently get much better results using other libraries such as statsmodel (which gives similar results as R) in terms of AUC. For illustration purpose, I used a small data (I have tried much bigger data) http://www.ats.ucla.edu/stat/data/binary.csv in http://www.ats.ucla.edu/stat/r/dae/logit.htm Here is the snippet of my usage of LogisticRegressionWithLBFGS. val algorithm = new LogisticRegressionWithLBFGS algorithm.setIntercept(true) algorithm.optimizer .setNumIterations(100) .setRegParam(0.01) .setConvergenceTol(1e-5) val model = algorithm.run(training) model.clearThreshold() val scoreAndLabels = test.map { point = val score = model.predict(point.features) (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() I did a (0.6, 0.4) split for training/test. The response is admit and features are GRE score, GPA, and college Rank. Spark: Weights (GRE, GPA, Rank): [0.0011576276331509304,0.048544858567336854,-0.394202150286076] Intercept: -0.6488972641282202 Area under ROC: 0.6294070512820512 StatsModel: Weights [0.0018, 0.7220, -0.3148] Intercept: -3.5913 Area under ROC: 0.69 The weights from statsmodel seems more reasonable if you consider for a one unit increase in gpa, the log odds of being admitted to graduate school increases by 0.72 in statsmodel than 0.04 in Spark. I have seen much bigger difference with other data. So my question is has anyone compared the results with other libraries and is anything wrong with my code to invoke LogisticRegressionWithLBFGS? As the real data I am processing is pretty big and really want to use Spark to get this to work. Please let me know if you have similar experience and how you resolve it. Thanks, Xin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spatial function in spark
Hello , i am fairly new to spark and python programming. I have an RDD with polygons, i need to perform spatial joins , geohash calculations and other spatial operations on these RDDs parallelly. I run spark jobs on yarn cluster, and develop spark applications in python. So, can u please suggest some pointers on how to enable spatial support for spark applications ? Thanks !
Spark Application Dependency Issue
Hi All, I am on spark 1.1 with Datastax DSE. Application is Spark Streaming and have Couchbase dependencies which uses http-core 4.3.2 . While running application I get this error This is the error I get NoSuchMethodError: org.apache.http.protocol.RequestUserAgent.init(Ljava/lang/String;)V at com.couchbase.client.ViewConnection.init(ViewConnection.java:157) at com.couchbase.client.CouchbaseConnectionFactory.createViewConnection(CouchbaseConnectionFactory.java:254) at com.couchbase.client.CouchbaseClient.init(CouchbaseClient.java:266) at com.walmart.platform.cache.CouchBaseFactoryImpl.create(CouchBaseFactoryImpl.java:76) There are different versions of http-core dependencies in spark-classpath , http-core 4.1. 3 and http-core 4.2.4 . My application uses 4.3.2 . I tried using user-classpath-first option but it does not work for me since I am on spark 1.1. Any help or pointers would be really useful , Thanks, Snehal
Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)
Hi, I have tried a few models in Mllib to train a LogisticRegression model. However, I consistently get much better results using other libraries such as statsmodel (which gives similar results as R) in terms of AUC. For illustration purpose, I used a small data (I have tried much bigger data) http://www.ats.ucla.edu/stat/data/binary.csv in http://www.ats.ucla.edu/stat/r/dae/logit.htm Here is the snippet of my usage of LogisticRegressionWithLBFGS. val algorithm = new LogisticRegressionWithLBFGS algorithm.setIntercept(true) algorithm.optimizer .setNumIterations(100) .setRegParam(0.01) .setConvergenceTol(1e-5) val model = algorithm.run(training) model.clearThreshold() val scoreAndLabels = test.map { point = val score = model.predict(point.features) (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() I did a (0.6, 0.4) split for training/test. The response is admit and features are GRE score, GPA, and college Rank. Spark: Weights (GRE, GPA, Rank): [0.0011576276331509304,0.048544858567336854,-0.394202150286076] Intercept: -0.6488972641282202 Area under ROC: 0.6294070512820512 StatsModel: Weights [0.0018, 0.7220, -0.3148] Intercept: -3.5913 Area under ROC: 0.69 The weights from statsmodel seems more reasonable if you consider for a one unit increase in gpa, the log odds of being admitted to graduate school increases by 0.72 in statsmodel than 0.04 in Spark. I have seen much bigger difference with other data. So my question is has anyone compared the results with other libraries and is anything wrong with my code to invoke LogisticRegressionWithLBFGS? As the real data I am processing is pretty big and really want to use Spark to get this to work. Please let me know if you have similar experience and how you resolve it. Thanks, Xin
How to process data in chronological order
I have a key-value RDD, key is a timestamp (femto-second resolution, so grouping buys me nothing) and I want to reduce it in the chronological order. How do I do that in spark? I am fine with reducing contiguous sections of the set separately and then aggregating the resulting objects locally. Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-process-data-in-chronological-order-tp22966.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo
One more comment: That's a lot of categories for a feature. If it makes sense for your data, it will run faster if you can group the categories or split the 1895 categories into a few features which have fewer categories. On Wed, May 20, 2015 at 3:17 PM, Burak Yavuz brk...@gmail.com wrote: Could you please open a JIRA for it? The maxBins input is missing for the Python Api. Is it possible if you can use the current master? In the current master, you should be able to use trees with the Pipeline Api and DataFrames. Best, Burak On Wed, May 20, 2015 at 2:44 PM, Don Drake dondr...@gmail.com wrote: I'm running Spark v1.3.1 and when I run the following against my dataset: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3) The job will fail with the following message: Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 553, in trainRegressor loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 438, in _train loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 113, in callJavaFunc return _java2py(sc, func(*args)) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95 py4j.protocol.Py4JJavaError: An error occurred while calling o69.trainGradientBoostedTreesModel. : java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) = max categories in categorical features (= 1895) at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138) at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60) at org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150) at org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63) at org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96) at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595) So, it's complaining about the maxBins, if I provide maxBins=1900 and re-run it: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3, maxBins=1900) Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catF eatures, maxDepth=6, numIterations=3, maxBins=1900) TypeError: trainRegressor() got an unexpected keyword argument 'maxBins' It now says it knows nothing of maxBins. If I run the same command against DecisionTree or RandomForest (with maxBins=1900) it works just fine. Seems like a bug in GradientBoostedTrees. Suggestions? -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ 800-733-2143
Re: Spark 1.3.1 - SQL Issues
Thanks a bunch On 21 May 2015 07:11, Davies Liu dav...@databricks.com wrote: The docs had been updated. You should convert the DataFrame to RDD by `df.rdd` On Mon, Apr 20, 2015 at 5:23 AM, ayan guha guha.a...@gmail.com wrote: Hi Just upgraded to Spark 1.3.1. I am getting an warning Warning (from warnings module): File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\sql\context.py, line 191 warnings.warn(inferSchema is deprecated, please use createDataFrame instead) UserWarning: inferSchema is deprecated, please use createDataFrame instead However, documentation still says to use inferSchema. Here: http://spark.apache.org/docs/latest/sql-programming-guide.htm in section Also, I am getting an error in mlib.ALS.train function when passing dataframe (do I need to convert the DF to RDD?) Code: training = ssc.sql(select userId,movieId,rating from ratings where partitionKey 6).cache() print type(training) model = ALS.train(training,rank,numIter,lmbda) Error: class 'pyspark.sql.dataframe.DataFrame' Rank:8 Lmbda:1.0 iteration:10 Traceback (most recent call last): File D:\Project\Spark\code\movie_sql.py, line 109, in module bestConf = getBestModel(sc,ssc,training,validation,validationNoRating) File D:\Project\Spark\code\movie_sql.py, line 54, in getBestModel model = ALS.train(trainingRDD,rank,numIter,lmbda) File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 139, in train model = callMLlibFunc(trainALSModel, cls._prepare(ratings), rank, iterations, File D:\spark\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\spark-1.3.1-bin-hadoop2.6\python\pyspark\mllib\recommendation.py, line 127, in _prepare assert isinstance(ratings, RDD), ratings should be RDD AssertionError: ratings should be RDD -- Best Regards, Ayan Guha
Re: Spark Job not using all nodes in cluster
No. I am not setting the number of executors anywhere (in env file or in program). Is it due to large number of small files ? On Wed, May 20, 2015 at 5:11 PM, ayan guha guha.a...@gmail.com wrote: What is your spark env file says? Are you setting number of executors in spark context? On 20 May 2015 13:16, Shailesh Birari sbirar...@gmail.com wrote: Hi, I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB of RAM. I have around 600,000+ Json files on HDFS. Each file is small around 1KB in size. Total data is around 16GB. Hadoop block size is 256MB. My application reads these files with sc.textFile() (or sc.jsonFile() tried both) API. But all the files are getting read by only one node (4 executors). Spark UI shows all 600K+ tasks on one node and 0 on other nodes. I confirmed that all files are accessible from all nodes. Some other application which uses big files uses all nodes on same cluster. Can you please let me know why it is behaving in such way ? Thanks, Shailesh -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)
Hi Xin, 2 suggestions: 1) Feature scaling: spark.mllib's LogisticRegressionWithLBFGS uses feature scaling, which scales feature values to have unit standard deviation. That improves optimization behavior, and it often improves statistical estimation (though maybe not for your dataset). However, it effectively changes the model being learned, so you should expect different results from other libraries like R. You could instead use LogisticRegressionWithSGD, which does not do feature scaling. With SGD, you may need to play around with the stepSize more to get it to converge, but it should be able to learn exactly the same model as R. 2) Convergence: I'd do a sanity check and make sure the algorithm is converging. (Compare with running for more iterations or using a lower convergenceTol.) Note: If you can use the Spark master branch (or wait for Spark 1.4), then the spark.ml Pipelines API will be a good option. It now has LogisticRegression which does not do feature scaling, and it uses LBFGS or OWLQN (depending on the regularization type) for optimization. It's also been compared with R in unit tests. Good luck! Joseph On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote: Hi, I have tried a few models in Mllib to train a LogisticRegression model. However, I consistently get much better results using other libraries such as statsmodel (which gives similar results as R) in terms of AUC. For illustration purpose, I used a small data (I have tried much bigger data) http://www.ats.ucla.edu/stat/data/binary.csv in http://www.ats.ucla.edu/stat/r/dae/logit.htm Here is the snippet of my usage of LogisticRegressionWithLBFGS. val algorithm = new LogisticRegressionWithLBFGS algorithm.setIntercept(true) algorithm.optimizer .setNumIterations(100) .setRegParam(0.01) .setConvergenceTol(1e-5) val model = algorithm.run(training) model.clearThreshold() val scoreAndLabels = test.map { point = val score = model.predict(point.features) (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() I did a (0.6, 0.4) split for training/test. The response is admit and features are GRE score, GPA, and college Rank. Spark: Weights (GRE, GPA, Rank): [0.0011576276331509304,0.048544858567336854,-0.394202150286076] Intercept: -0.6488972641282202 Area under ROC: 0.6294070512820512 StatsModel: Weights [0.0018, 0.7220, -0.3148] Intercept: -3.5913 Area under ROC: 0.69 The weights from statsmodel seems more reasonable if you consider for a one unit increase in gpa, the log odds of being admitted to graduate school increases by 0.72 in statsmodel than 0.04 in Spark. I have seen much bigger difference with other data. So my question is has anyone compared the results with other libraries and is anything wrong with my code to invoke LogisticRegressionWithLBFGS? As the real data I am processing is pretty big and really want to use Spark to get this to work. Please let me know if you have similar experience and how you resolve it. Thanks, Xin
Cannot submit SparkPi to Standalone (1.3.1) running on another Server (Both Linux)
I am attempting to submit a job (using SparkPi) from one Linux machine (Ubuntu 14.04) to Spark 1.3.1 running in standalone mode on another Linux machine (Xubuntu 12.04; spartacus.servile.war), but I cannot make a connection. I have investigated everything I can think of to diagnose/fix the problem but have run out of ideas. Here are the facts; On the Xubuntu machine I can submit SparkPi without a problem. I can also test successfully that the master is listening on port 7077 by connecting with Telnet. Netstat shows: tcp6 0 0 spartacus.servile.war:7077 [::]:* LISTEN Iptables is not running, it is not even installed. I have log4j set to log in DEBUG mode to a file. On the Ubuntu client machine I can view the Spark Master web page at port 8080: http://spartacus:8080/ I can of course telnet to port 8080 on spartacus as well. If I try to telnet to port 7077 I get connection refused. If I try to submit SparkPI on this machine like so: ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://spartacus.servile.war:7077 --executor-memory 10G --total-executor-cores 8 /home/carey/dev/spark-1.3.1-bin-hadoop2.6/lib/spark-examples-1.3.1-hadoop2.6.0.jar 1 I get the following messages: 15/05/20 13:38:19 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkmas...@spartacus.servile.war:7077: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkmas...@spartacus.servile.war:7077 15/05/20 13:38:19 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkmas...@spartacus.servile.war:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: spartacus.servile.war/ 192.168.0.113:7077 Using spartacus or 192.168.0.113 instead of spartacus.servile.war makes no difference. Absolutely nothing shows up in the Spark log on spartacus when I try to submit, I just see the worker heartbeat exchange. In my hosts file on this machine I have: 192.168.0.113 spartacus.servile.war spartacus Using the default spark-env.sh or setting: export SPARK_MASTER_IP=spartacus.servile.war (or just spartacus, or 192.168.0.113) makes no difference. I have tried each combination of host ID in the submit and in the spark-env.sh file together (3x4 = 12 combinations) with the same result each time. Iptables is not running on the Ubuntu machine either. What is it I am missing?
Help needed with Py4J
Hi Colleagues We need to call a Scala Class from pySpark in Ipython notebook. We tried something like below : from py4j.java_gateway import java_import java_import(sparkContext._jvm,'mynamespace') myScalaClass = sparkContext._jvm.SimpleScalaClass () myScalaClass.sayHello(World) Works Fine But When we try to pass sparkContext to our class it fails like below myContext = _jvm.MySQLContext(sparkContext) fails with AttributeErrorTraceback (most recent call last) ipython-input-19-34330244f574 in module() 1 z = _jvm.MySQLContext(sparkContext) C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 690 691 args_command = ''.join( -- 692 [get_command_part(arg, self._pool) for arg in new_args]) 693 694 command = CONSTRUCTOR_COMMAND_NAME +\ C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_command_part(parameter, python_proxy_pool) 263 command_part += ';' + interface 264 else: -- 265 command_part = REFERENCE_TYPE + parameter._get_object_id() 266 267 command_part += '\n' attributeError: 'SparkContext' object has no attribute '_get_object_id' And myContext = _jvm.MySQLContext(sparkContext._jsc) fails with Constructor org.apache.spark.sql.MySQLContext([class org.apache.spark.api.java.JavaSparkContext]) does not exist Would this be possible ... or there are serialization issues and hence not possible. If not what are the options we have to instantiate our own SQLContext written in scala from pySpark... Best Regards, Santosh
Re: --jars works in yarn-client but not yarn-cluster mode, why?
Thank you so much, Marcelo! It WORKS! 2015-05-21 2:05 GMT+08:00 Marcelo Vanzin van...@cloudera.com: Hello, Sorry for the delay. The issue you're running into is because most HBase classes are in the system class path, while jars added with --jars are only visible to the application class loader created by Spark. So classes in the system class path cannot see them. You can work around this by setting --driver-classpath /opt/.../htrace-core-3.1.0-incubating.jar and --conf spark.executor.extraClassPath= /opt/.../htrace-core-3.1.0-incubating.jar in your spark-submit command line. (You can also add those configs to your spark-defaults.conf to avoid having to type them all the time; and don't forget to include any other jars that might be needed.) On Mon, May 18, 2015 at 11:14 PM, Fengyun RAO raofeng...@gmail.com wrote: Thanks, Marcelo! Below is the full log, SLF4J: Class path contains multiple SLF4J bindings. SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.4.0-1.cdh5.4.0.p0.27/jars/avro-tools-1.7.6-cdh5.4.0.jar!/org/slf4j/impl/StaticLoggerBinder.class] SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation. SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory] 15/05/19 14:08:58 INFO yarn.ApplicationMaster: Registered signal handlers for [TERM, HUP, INT] 15/05/19 14:08:59 INFO yarn.ApplicationMaster: ApplicationAttemptId: appattempt_1432015548391_0003_01 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nobody, raofengyun); users with modify permissions: Set(nobody, raofengyun) 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Starting the user application in a separate Thread 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context initialization 15/05/19 14:09:00 INFO yarn.ApplicationMaster: Waiting for spark context initialization ... 15/05/19 14:09:00 INFO spark.SparkContext: Running Spark version 1.3.0 15/05/19 14:09:00 INFO spark.SecurityManager: Changing view acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: Changing modify acls to: nobody,raofengyun 15/05/19 14:09:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nobody, raofengyun); users with modify permissions: Set(nobody, raofengyun) 15/05/19 14:09:01 INFO slf4j.Slf4jLogger: Slf4jLogger started 15/05/19 14:09:01 INFO Remoting: Starting remoting 15/05/19 14:09:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@gs-server-v-127:7191] 15/05/19 14:09:01 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@gs-server-v-127:7191] 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'sparkDriver' on port 7191. 15/05/19 14:09:01 INFO spark.SparkEnv: Registering MapOutputTracker 15/05/19 14:09:01 INFO spark.SparkEnv: Registering BlockManagerMaster 15/05/19 14:09:01 INFO storage.DiskBlockManager: Created local directory at /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/blockmgr-3250910b-693e-46ff-b057-26d552fd8abd 15/05/19 14:09:01 INFO storage.MemoryStore: MemoryStore started with capacity 259.7 MB 15/05/19 14:09:01 INFO spark.HttpFileServer: HTTP File server directory is /data1/cdh/yarn/nm/usercache/raofengyun/appcache/application_1432015548391_0003/httpd-5bc614bc-d8b1-473d-a807-4d9252eb679d 15/05/19 14:09:01 INFO spark.HttpServer: Starting HTTP Server 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/19 14:09:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:9349 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'HTTP file server' on port 9349. 15/05/19 14:09:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator 15/05/19 14:09:01 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter 15/05/19 14:09:01 INFO server.Server: jetty-8.y.z-SNAPSHOT 15/05/19 14:09:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:63023 15/05/19 14:09:01 INFO util.Utils: Successfully started service 'SparkUI' on port 63023. 15/05/19 14:09:01 INFO ui.SparkUI: Started SparkUI at http://gs-server-v-127:63023 15/05/19 14:09:02 INFO cluster.YarnClusterScheduler: Created YarnClusterScheduler 15/05/19 14:09:02 INFO netty.NettyBlockTransferService: Server created on 33526 15/05/19 14:09:02 INFO storage.BlockManagerMaster: Trying to register BlockManager 15/05/19 14:09:02 INFO
Spark build with Hive
Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 can only build with 0.13 , 0.12 according to the document . # Apache Hadoop 2.4.X with Hive 13 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package # Apache Hadoop 2.4.X with Hive 12 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package guoqing0...@yahoo.com.hk
Re: Compare LogisticRegression results using Mllib with those using other libraries (e.g. statsmodel)
I tried running this data set as described with my own implementation of L2 regularized logistic regression using LBFGS to compare: https://github.com/cdgore/fitbox https://github.com/cdgore/fitbox Intercept: -0.886745823033 Weights (['gre', 'gpa', 'rank']):[ 0.28862268 0.19402388 -0.36637964] Area under ROC: 0.724056603774 The difference could be from the feature preprocessing as mentioned. I normalized the features around 0: binary_train_normalized = (binary_train - binary_train.mean()) / binary_train.std() binary_test_normalized = (binary_test - binary_train.mean()) / binary_train.std() On a data set this small, the difference in models could also be the result of how the training/test sets were split. Have you tried running k-folds cross validation on a larger data set? Chris On May 20, 2015, at 6:15 PM, DB Tsai d...@netflix.com.INVALID wrote: Hi Xin, If you take a look at the model you trained, the intercept from Spark is significantly smaller than StatsModel, and the intercept represents a prior on categories in LOR which causes the low accuracy in Spark implementation. In LogisticRegressionWithLBFGS, the intercept is regularized due to the implementation of Updater, and the intercept should not be regularized. In the new pipleline APIs, a LOR with elasticNet is implemented, and the intercept is properly handled. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala As you can see the tests, https://github.com/apache/spark/blob/master/mllib/src/test/scala/org/apache/spark/ml/classification/LogisticRegressionSuite.scala the result is exactly the same as R now. BTW, in both version, the feature scalings are done before training, and we train the model in scaled space but transform the model weights back to original space. The only difference is in the mllib version, LogisticRegressionWithLBFGS regularizes the intercept while in the ml version, the intercept is excluded from regularization. As a result, if lambda is zero, the model should be the same. On Wed, May 20, 2015 at 3:42 PM, Xin Liu liuxin...@gmail.com wrote: Hi, I have tried a few models in Mllib to train a LogisticRegression model. However, I consistently get much better results using other libraries such as statsmodel (which gives similar results as R) in terms of AUC. For illustration purpose, I used a small data (I have tried much bigger data) http://www.ats.ucla.edu/stat/data/binary.csv in http://www.ats.ucla.edu/stat/r/dae/logit.htm Here is the snippet of my usage of LogisticRegressionWithLBFGS. val algorithm = new LogisticRegressionWithLBFGS algorithm.setIntercept(true) algorithm.optimizer .setNumIterations(100) .setRegParam(0.01) .setConvergenceTol(1e-5) val model = algorithm.run(training) model.clearThreshold() val scoreAndLabels = test.map { point = val score = model.predict(point.features) (score, point.label) } val metrics = new BinaryClassificationMetrics(scoreAndLabels) val auROC = metrics.areaUnderROC() I did a (0.6, 0.4) split for training/test. The response is admit and features are GRE score, GPA, and college Rank. Spark: Weights (GRE, GPA, Rank): [0.0011576276331509304,0.048544858567336854,-0.394202150286076] Intercept: -0.6488972641282202 Area under ROC: 0.6294070512820512 StatsModel: Weights [0.0018, 0.7220, -0.3148] Intercept: -3.5913 Area under ROC: 0.69 The weights from statsmodel seems more reasonable if you consider for a one unit increase in gpa, the log odds of being admitted to graduate school increases by 0.72 in statsmodel than 0.04 in Spark. I have seen much bigger difference with other data. So my question is has anyone compared the results with other libraries and is anything wrong with my code to invoke LogisticRegressionWithLBFGS? As the real data I am processing is pretty big and really want to use Spark to get this to work. Please let me know if you have similar experience and how you resolve it. Thanks, Xin - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help needed with Py4J
Yeah ... I am able to instantiate the simple scala class as explained below which is from the same JAR Regards Santosh On May 20, 2015, at 7:26 PM, Holden Karau hol...@pigscanfly.camailto:hol...@pigscanfly.ca wrote: Are your jars included in both the driver and worker class paths? On Wednesday, May 20, 2015, Addanki, Santosh Kumar santosh.kumar.adda...@sap.commailto:santosh.kumar.adda...@sap.com wrote: Hi Colleagues We need to call a Scala Class from pySpark in Ipython notebook. We tried something like below : from py4j.java_gateway import java_import java_import(sparkContext._jvm,'mynamespace') myScalaClass = sparkContext._jvm.SimpleScalaClass () myScalaClass.sayHello(“World”) Works Fine But When we try to pass sparkContext to our class it fails like below myContext = _jvm.MySQLContext(sparkContext) fails with AttributeErrorTraceback (most recent call last) ipython-input-19-34330244f574 in module() 1 z = _jvm.MySQLContext(sparkContext) C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 690 691 args_command = ''.join( -- 692 [get_command_part(arg, self._pool) for arg in new_args]) 693 694 command = CONSTRUCTOR_COMMAND_NAME +\ C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_command_part(parameter, python_proxy_pool) 263 command_part += ';' + interface 264 else: -- 265 command_part = REFERENCE_TYPE + parameter._get_object_id() 266 267 command_part += '\n' attributeError: 'SparkContext' object has no attribute '_get_object_id' And myContext = _jvm.MySQLContext(sparkContext._jsc) fails with Constructor org.apache.spark.sql.MySQLContext([class org.apache.spark.api.java.JavaSparkContext]) does not exist Would this be possible … or there are serialization issues and hence not possible. If not what are the options we have to instantiate our own SQLContext written in scala from pySpark… Best Regards, Santosh -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Spark Streaming graceful shutdown in Spark 1.4
If you are talking about handling driver crash failures, then all bets are off anyways! Adding a shutdown hook in the hope of handling driver process failure, handles only a some cases (Ctrl-C), but does not handle cases like SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its not a good idea to rely on that. Nonetheless I have opened a PR to handle the shutdown of the StreamigntContext in the same way as SparkContext. https://github.com/apache/spark/pull/6307 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: GradientBoostedTrees.trainRegressor with categoricalFeaturesInfo
JIRA created: https://issues.apache.org/jira/browse/SPARK-7781 Joseph, I agree, I'm debating removing this feature altogether, but I'm putting the model through its paces. Thanks. -Don On Wed, May 20, 2015 at 7:52 PM, Joseph Bradley jos...@databricks.com wrote: One more comment: That's a lot of categories for a feature. If it makes sense for your data, it will run faster if you can group the categories or split the 1895 categories into a few features which have fewer categories. On Wed, May 20, 2015 at 3:17 PM, Burak Yavuz brk...@gmail.com wrote: Could you please open a JIRA for it? The maxBins input is missing for the Python Api. Is it possible if you can use the current master? In the current master, you should be able to use trees with the Pipeline Api and DataFrames. Best, Burak On Wed, May 20, 2015 at 2:44 PM, Don Drake dondr...@gmail.com wrote: I'm running Spark v1.3.1 and when I run the following against my dataset: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3) The job will fail with the following message: Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatures, maxDepth=6, numIterations=3) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 553, in trainRegressor loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/tree.py, line 438, in _train loss, numIterations, learningRate, maxDepth) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 120, in callMLlibFunc return callJavaFunc(sc, api, *args) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/pyspark/mllib/common.py, line 113, in callJavaFunc return _java2py(sc, func(*args)) File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py, line 538, in __call__ File /Users/drake/spark/spark-1.3.1-bin-hadoop2.6/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py, line 300, in get_return_value 15/05/20 16:40:12 INFO BlockManager: Removing block rdd_32_95 py4j.protocol.Py4JJavaError: An error occurred while calling o69.trainGradientBoostedTreesModel. : java.lang.IllegalArgumentException: requirement failed: DecisionTree requires maxBins (= 32) = max categories in categorical features (= 1895) at scala.Predef$.require(Predef.scala:233) at org.apache.spark.mllib.tree.impl.DecisionTreeMetadata$.buildMetadata(DecisionTreeMetadata.scala:128) at org.apache.spark.mllib.tree.RandomForest.run(RandomForest.scala:138) at org.apache.spark.mllib.tree.DecisionTree.run(DecisionTree.scala:60) at org.apache.spark.mllib.tree.GradientBoostedTrees$.org$apache$spark$mllib$tree$GradientBoostedTrees$$boost(GradientBoostedTrees.scala:150) at org.apache.spark.mllib.tree.GradientBoostedTrees.run(GradientBoostedTrees.scala:63) at org.apache.spark.mllib.tree.GradientBoostedTrees$.train(GradientBoostedTrees.scala:96) at org.apache.spark.mllib.api.python.PythonMLLibAPI.trainGradientBoostedTreesModel(PythonMLLibAPI.scala:595) So, it's complaining about the maxBins, if I provide maxBins=1900 and re-run it: model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catFeatu res, maxDepth=6, numIterations=3, maxBins=1900) Traceback (most recent call last): File /Users/drake/fd/spark/mltest.py, line 73, in module model = GradientBoostedTrees.trainRegressor(trainingData, categoricalFeaturesInfo=catF eatures, maxDepth=6, numIterations=3, maxBins=1900) TypeError: trainRegressor() got an unexpected keyword argument 'maxBins' It now says it knows nothing of maxBins. If I run the same command against DecisionTree or RandomForest (with maxBins=1900) it works just fine. Seems like a bug in GradientBoostedTrees. Suggestions? -Don -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ 800-733-2143 -- Donald Drake Drake Consulting http://www.drakeconsulting.com/ http://www.MailLaunder.com/ http://www.DrudgeSiren.com/ http://plu.gd/ 800-733-2143
Re: RE: Spark build with Hive
Thanks very much , Which version will be support In the upcome 1.4 ? I hope it will be support more versions. guoqing0...@yahoo.com.hk From: Cheng, Hao Date: 2015-05-21 11:20 To: Ted Yu; guoqing0...@yahoo.com.hk CC: user Subject: RE: Spark build with Hive Yes, ONLY support 0.12.0 and 0.13.1 currently. Hopefully we can support higher versions in next 1 or 2 releases. From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Thursday, May 21, 2015 11:12 AM To: guoqing0...@yahoo.com.hk Cc: user Subject: Re: Spark build with Hive I am afraid even Hive 1.0 is not supported, let alone Hive 1.2 Cheers On Wed, May 20, 2015 at 8:08 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 can only build with 0.13 , 0.12 according to the document . # Apache Hadoop 2.4.X with Hive 13 supportmvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package# Apache Hadoop 2.4.X with Hive 12 supportmvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package guoqing0...@yahoo.com.hk
Re: [Unit Test Failure] Test org.apache.spark.streaming.JavaAPISuite.testCount failed
Has this been fixed for you now? There has been a number of patches since then and it may have been fixed. On Thu, May 14, 2015 at 7:20 AM, Wangfei (X) wangf...@huawei.com wrote: Yes it is repeatedly on my locally Jenkins. 发自我的 iPhone 在 2015年5月14日,18:30,Tathagata Das t...@databricks.com 写道: Do you get this failure repeatedly? On Thu, May 14, 2015 at 12:55 AM, kf wangf...@huawei.com wrote: Hi, all, i got following error when i run unit test of spark by dev/run-tests on the latest branch-1.4 branch. the latest commit id: commit d518c0369fa412567855980c3f0f426cde5c190d Author: zsxwing zsxw...@gmail.com Date: Wed May 13 17:58:29 2015 -0700 error [info] Test org.apache.spark.streaming.JavaAPISuite.testCount started [error] Test org.apache.spark.streaming.JavaAPISuite.testCount failed: org.apache.spark.SparkException: Error communicating with MapOutputTracker [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:113) [error] at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:119) [error] at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:324) [error] at org.apache.spark.SparkEnv.stop(SparkEnv.scala:93) [error] at org.apache.spark.SparkContext.stop(SparkContext.scala:1577) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:626) [error] at org.apache.spark.streaming.StreamingContext.stop(StreamingContext.scala:597) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreamsWithPartitions(TestSuiteBase.scala:403) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreamsWithPartitions(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.TestSuiteBase$class.runStreams(TestSuiteBase.scala:344) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestBase$class.runStreams(JavaTestUtils.scala:74) [error] at org.apache.spark.streaming.JavaTestUtils$.runStreams(JavaTestUtils.scala:102) [error] at org.apache.spark.streaming.JavaTestUtils.runStreams(JavaTestUtils.scala) [error] at org.apache.spark.streaming.JavaAPISuite.testCount(JavaAPISuite.java:103) [error] ... [error] Caused by: org.apache.spark.SparkException: Error sending message [message = StopMapOutputTracker] [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:116) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:78) [error] at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:109) [error] ... 52 more [error] Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] [error] at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) [error] at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) [error] at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) [error] at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) [error] at scala.concurrent.Await$.result(package.scala:107) [error] at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:102) [error] ... 54 more -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Unit-Test-Failure-Test-org-apache-spark-streaming-JavaAPISuite-testCount-failed-tp22879.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Help needed with Py4J
Ah sorry, I missed that part (I've been dealing with some py4j stuff today as well and maybe skimmed it a bit too quickly). Do you have your code somewhere I could take a look at? Also does your constructor expect a JavaSparkContext or a regular SparkContext (if you look at how the SQLContext is constructed in python its done using a regular SparkContext, so _jsc.sc() is used). On Wed, May 20, 2015 at 7:32 PM, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Yeah ... I am able to instantiate the simple scala class as explained below which is from the same JAR Regards Santosh On May 20, 2015, at 7:26 PM, Holden Karau hol...@pigscanfly.ca wrote: Are your jars included in both the driver and worker class paths? On Wednesday, May 20, 2015, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Hi Colleagues We need to call a Scala Class from pySpark in Ipython notebook. We tried something like below : from py4j.java_gateway import java_import java_import(sparkContext._jvm,'mynamespace') myScalaClass = sparkContext._jvm.SimpleScalaClass () myScalaClass.sayHello(“World”) Works Fine But When we try to pass sparkContext to our class it fails like below myContext = _jvm.MySQLContext(sparkContext) fails with AttributeErrorTraceback (most recent call last) ipython-input-19-34330244f574 in module() 1 z = _jvm.MySQLContext(sparkContext) C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 690 691 args_command = ''.join( -- 692 [get_command_part(arg, self._pool) for arg in new_args]) 693 694 command = CONSTRUCTOR_COMMAND_NAME +\ C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_command_part(parameter, python_proxy_pool) 263 command_part += ';' + interface 264 else: -- 265 command_part = REFERENCE_TYPE + parameter._get_object_id() 266 267 command_part += '\n' attributeError: 'SparkContext' object has no attribute '_get_object_id' And myContext = _*jvm.MySQLContext(sparkContext.*_jsc) fails with Constructor org.apache.spark.sql.MySQLContext([class org.apache.spark.api.java.JavaSparkContext]) does not exist Would this be possible … or there are serialization issues and hence not possible. If not what are the options we have to instantiate our own SQLContext written in scala from pySpark… Best Regards, Santosh -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
RE: RE: Spark build with Hive
In 1.4 I think we still only support 0.12.0 and 0.13.1. From: guoqing0...@yahoo.com.hk [mailto:guoqing0...@yahoo.com.hk] Sent: Thursday, May 21, 2015 12:03 PM To: Cheng, Hao; Ted Yu Cc: user Subject: Re: RE: Spark build with Hive Thanks very much , Which version will be support In the upcome 1.4 ? I hope it will be support more versions. guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk From: Cheng, Haomailto:hao.ch...@intel.com Date: 2015-05-21 11:20 To: Ted Yumailto:yuzhih...@gmail.com; guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk CC: usermailto:user@spark.apache.org Subject: RE: Spark build with Hive Yes, ONLY support 0.12.0 and 0.13.1 currently. Hopefully we can support higher versions in next 1 or 2 releases. From: Ted Yu [mailto:yuzhih...@gmail.com] Sent: Thursday, May 21, 2015 11:12 AM To: guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk Cc: user Subject: Re: Spark build with Hive I am afraid even Hive 1.0 is not supported, let alone Hive 1.2 Cheers On Wed, May 20, 2015 at 8:08 PM, guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk wrote: Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 can only build with 0.13 , 0.12 according to the document . # Apache Hadoop 2.4.X with Hive 13 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package # Apache Hadoop 2.4.X with Hive 12 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package guoqing0...@yahoo.com.hkmailto:guoqing0...@yahoo.com.hk
Re: Spark Streaming graceful shutdown in Spark 1.4
Thanks Tathagata for making this change.. Dibyendu On Thu, May 21, 2015 at 8:24 AM, Tathagata Das t...@databricks.com wrote: If you are talking about handling driver crash failures, then all bets are off anyways! Adding a shutdown hook in the hope of handling driver process failure, handles only a some cases (Ctrl-C), but does not handle cases like SIGKILL (does not run JVM shutdown hooks) or driver machine crash. So its not a good idea to rely on that. Nonetheless I have opened a PR to handle the shutdown of the StreamigntContext in the same way as SparkContext. https://github.com/apache/spark/pull/6307 On Tue, May 19, 2015 at 12:51 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Thenka Sean . you are right. If driver program is running then I can handle shutdown in main exit path . But if Driver machine is crashed (if you just stop the application, for example killing the driver process ), then Shutdownhook is the only option isn't it ? What I try to say is , just doing ssc.stop in sys.ShutdownHookThread or Runtime.getRuntime().addShutdownHook ( in java) wont work anymore. I need to use the Utils.addShutdownHook with a priority .. So just checking if Spark Streaming can make graceful shutdown as default shutdown mechanism. Dibyendu On Tue, May 19, 2015 at 1:03 PM, Sean Owen so...@cloudera.com wrote: I don't think you should rely on a shutdown hook. Ideally you try to stop it in the main exit path of your program, even in case of an exception. On Tue, May 19, 2015 at 7:59 AM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: You mean to say within Runtime.getRuntime().addShutdownHook I call ssc.stop(stopSparkContext = true, stopGracefully = true) ? This won't work anymore in 1.4. The SparkContext got stopped before Receiver processed all received blocks and I see below exception in logs. But if I add the Utils.addShutdownHook with the priority as I mentioned , then only graceful shutdown works . In that case shutdown-hook run in priority order.
Re: Help needed with Py4J
Are your jars included in both the driver and worker class paths? On Wednesday, May 20, 2015, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Hi Colleagues We need to call a Scala Class from pySpark in Ipython notebook. We tried something like below : from py4j.java_gateway import java_import java_import(sparkContext._jvm,'mynamespace') myScalaClass = sparkContext._jvm.SimpleScalaClass () myScalaClass.sayHello(“World”) Works Fine But When we try to pass sparkContext to our class it fails like below myContext = _jvm.MySQLContext(sparkContext) fails with AttributeErrorTraceback (most recent call last) ipython-input-19-34330244f574 in module() 1 z = _jvm.MySQLContext(sparkContext) C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py in __call__(self, *args) 690 691 args_command = ''.join( -- 692 [get_command_part(arg, self._pool) for arg in new_args]) 693 694 command = CONSTRUCTOR_COMMAND_NAME +\ C:\Users\i033085\spark\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py in get_command_part(parameter, python_proxy_pool) 263 command_part += ';' + interface 264 else: -- 265 command_part = REFERENCE_TYPE + parameter._get_object_id() 266 267 command_part += '\n' attributeError: 'SparkContext' object has no attribute '_get_object_id' And myContext = _*jvm.MySQLContext(sparkContext.*_jsc) fails with Constructor org.apache.spark.sql.MySQLContext([class org.apache.spark.api.java.JavaSparkContext]) does not exist Would this be possible … or there are serialization issues and hence not possible. If not what are the options we have to instantiate our own SQLContext written in scala from pySpark… Best Regards, Santosh -- Cell : 425-233-8271 Twitter: https://twitter.com/holdenkarau Linked In: https://www.linkedin.com/in/holdenkarau
Re: Spark build with Hive
I am afraid even Hive 1.0 is not supported, let alone Hive 1.2 Cheers On Wed, May 20, 2015 at 8:08 PM, guoqing0...@yahoo.com.hk guoqing0...@yahoo.com.hk wrote: Hi , is the Spark-1.3.1 can build with the Hive-1.2 ? it seem to Spark-1.3.1 can only build with 0.13 , 0.12 according to the document . # Apache Hadoop 2.4.X with Hive 13 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver -DskipTests clean package# Apache Hadoop 2.4.X with Hive 12 support mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-0.12.0 -Phive-thriftserver -DskipTests clean package -- guoqing0...@yahoo.com.hk
Storing spark processed output to Database asynchronously.
Hi, From my understanding of Spark Streaming, I created a spark entry point, for continuous UDP data, using: SparkConf conf = new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount);JavaStreamingContext jssc = new JavaStreamingContext(conf, new Duration(1));JavaReceiverInputDStreamString lines = jssc.receiverStream(new CustomReceiver(8060)); Now, when I process this input stream using: JavaDStream hash=lines.flatMap(my-code)JavaPairDStream tuple= hash.mapToPair(my-code)JavaPairDStream output= tuple.reduceByKey(my-code) output.foreachRDD( new Function2JavaPairRDDString,ArrayListString,Time,Void(){ @Override public Void call( JavaPairRDDString, ArrayListString arg0, Time arg1) throws Exception { // TODO Auto-generated method stub new AsyncRDDActions(arg0.rdd(), null); arg0.foreachPartition( new VoidFunctionIteratorTuple2String,ArrayListString(){ @Override public void call( IteratorTuple2String, ArrayListString arg0) throws Exception { // TODO Auto-generated method stub GraphDatabaseService graphDb = new GraphDatabaseFactory().newEmbeddedDatabaseBuilder(/dev/shm/Advertisement/data/) .setConfig(remote_shell_enabled, true) .newGraphDatabase(); try (Transaction tx = graphDb.beginTx()) { while (arg0.hasNext()) { Tuple2 String, ArrayList String tuple = arg0.next(); Node HMac=Neo4jOperations.getHMacFromValue(graphDb, tuple._1); boolean oldHMac=false; if (HMac!= null){ System.out.println(Alread in Database: + tuple._1); oldHMac=true; } else HMac=Neo4jOperations.createHMac(graphDb, tuple._1); ArrayListString zipcodes=tuple._2; for(String zipcode : zipcodes){ Node Zipcode=Neo4jOperations.getZipcodeFromValue(graphDb, zipcode); if(Zipcode!=null){ System.out.println(Already in Database: + zipcode); if(oldHMac==true Neo4jOperations.getRelationshipBetween(HMac, Zipcode)!=null) Neo4jOperations.updateToCurrentTime(HMac, Zipcode); else Neo4jOperations.travelTo(HMac, Zipcode); } else{ Zipcode=Neo4jOperations.createZipcode(graphDb, zipcode); Neo4jOperations.travelTo(HMac, Zipcode); } } } tx.success(); } graphDb.shutdown(); } }); return null; } }); The part of code in output.foreachRDD pushes the output of spark into Neo4j Database. Checking for duplicates values. This part of code is very time consuming because of which my processing time exceeds batch time. Because of that, it *result in dataloss*. So, I was thinking of pushing the output into the database asynchronously. I found AsyncRDDActions( https://spark.apache.org/docs/1.1.1/api/java/org/apache/spark/rdd/AsyncRDDActions.html) for this purpose, but cannot find a working example for that in Java. Especially, the function foreachPatitionAsync inside which we have to use Function1 Any help is appreciated. Thanks, Gautam
View all user's application logs in history server
Hi, I'm using Spark 1.4.0-rc1 and I'm using default settings for history server. But I can only see my own logs. Is it possible to view all user's logs? The permission is fine for the user group. -- Jianshi Huang LinkedIn: jianshi Twitter: @jshuang Github Blog: http://huangjs.github.com/
Re: Mesos Spark Tasks - Lost
Tim thanks for your reply, I am following this quite clear mesos-spark tutorial: https://docs.mesosphere.com/tutorials/run-spark-on-mesos/ So mainly I tried running spark-shell which locally works fine but when the jobs are submitted through mesos something goes wrong! My question is: is there a some extra configuration needed for the workers (that is not mentioned at the tutorial) ?? The Executor Lost message I get is really generic so I dont know whats going on.. Please check the attached mesos execution event log. Thanks again, Panagiotis On Wed, May 20, 2015 at 8:21 AM, Tim Chen t...@mesosphere.io wrote: Can you share your exact spark-submit command line? And also cluster mode is not yet released yet (1.4) and doesn't support spark-shell, so I think you're just using client mode unless you're using latest master. Tim On Tue, May 19, 2015 at 8:57 AM, Panagiotis Garefalakis panga...@gmail.com wrote: Hello all, I am facing a weird issue for the last couple of days running Spark on top of Mesos and I need your help. I am running Mesos in a private cluster and managed to deploy successfully hdfs, cassandra, marathon and play but Spark is not working for a reason. I have tried so far: different java versions (1.6 and 1.7 oracle and openjdk), different spark-env configuration, different Spark versions (from 0.8.8 to 1.3.1), different HDFS versions (hadoop 5.1 and 4.6), and updating pom dependencies. More specifically while local tasks complete fine, in cluster mode all the tasks get lost. (both using spark-shell and spark-submit) From the worker log I see something like this: --- I0519 02:36:30.475064 12863 fetcher.cpp:214] Fetching URI 'hdfs:/:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' I0519 02:36:30.747372 12863 fetcher.cpp:99] Fetching URI 'hdfs://X:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' using Hadoop Client I0519 02:36:30.747546 12863 fetcher.cpp:109] Downloading resource from 'hdfs://:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' to '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' I0519 02:36:34.205878 12863 fetcher.cpp:78] Extracted resource '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' into '/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3' *Error: Could not find or load main class two* --- And from the Spark Terminal: --- 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0 15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at SparkPi.scala:35 15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at SparkPi.scala:35 Exception in thread main org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure: Lost task 7.3 in stage 0.0 (TID 26, ): ExecutorLostFailure (executor lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org http://org.apache.spark.scheduler.dagscheduler.org/$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) .. at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) --- Any help will be greatly appreciated! Regards, Panagiotis -sparklogs-spark-shell-1431993674182-EVENT_LOG_1 Description: Binary data - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Intermittent difficulties for Worker to contact Master on same machine in standalone
But if I'm reading his email correctly he's saying that: 1. The master and slave are on the same box (so network hiccups are unlikely culprit) 2. The failures are intermittent -- i.e program works for a while then worker gets disassociated... Is it possible that the master restarted? We used to have problems like this where we'd restart the master process, it won't be listening on 7077 for some time, but the worker process is trying to connect and by the time the master is up the worker has given up... On Wed, May 20, 2015 at 5:16 AM, Evo Eftimov evo.efti...@isecc.com wrote: Check whether the name can be resolved in the /etc/hosts file (or DNS) of the worker (the same btw applies for the Node where you run the driver app – all other nodes must be able to resolve its name) *From:* Stephen Boesch [mailto:java...@gmail.com] *Sent:* Wednesday, May 20, 2015 10:07 AM *To:* user *Subject:* Intermittent difficulties for Worker to contact Master on same machine in standalone What conditions would cause the following delays / failure for a standalone machine/cluster to have the Worker contact the Master? 15/05/20 02:02:53 INFO WorkerWebUI: Started WorkerWebUI at http://10.0.0.3:8081 15/05/20 02:02:53 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:02:53 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077 15/05/20 02:03:04 INFO Worker: Retrying connection to master (attempt # 1) .. .. 15/05/20 02:03:26 INFO Worker: Retrying connection to master (attempt # 3) 15/05/20 02:03:26 INFO Worker: Connecting to master akka.tcp://sparkMaster@mellyrn.local:7077/user/Master... 15/05/20 02:03:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@mellyrn.local:7077]. Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters. Reason: Connection refused: mellyrn.local/10.0.0.3:7077