Re: SparkSQL production readiness
OK, good to know data frames are still experimental. Thanks Michael. On Mon, Mar 2, 2015 at 12:37 PM, Michael Armbrust mich...@databricks.com wrote: We have been using Spark SQL in production for our customers at Databricks for almost a year now. We also know of some very large production deployments elsewhere. It is still a young project, but I wouldn't call it alpha. The primary changes to the API are the addition of the DataFrame interface, which is an expansion of the DSL that was already there. All of the SQL / HiveQL stuff remains unchanged, as well as the internal execution engine. DataFrames are still marked experimental, since as you said, we should let people use them before cementing them.
Re: JavaRDD method ambiguous after upgrading to Java 8
What's your actual code? that can't compile since groupBy would return a JavaPairRDD. I tried compiling that (after changing to void type) with Java 7 and Java 8 (meaning, not just the JDK but compiling for the language level too) and both worked. On Mon, Mar 2, 2015 at 10:03 PM, btiernay btier...@hotmail.com wrote: The following method demonstrates the issue: private static Tuple2String, String group(JavaPairRDDString, String rdd, FunctionTuple2String, String, String f) { return rdd.groupBy(f); } I get the following compilation error using Spark 1.1.1 and Java 8u31: The method groupBy(FunctionTuple2String,String,String) is ambiguous for the type JavaPairRDDString,String I believe it is ambiguous because both JavaPairRDD and JavaRDDLike offer the same method. Has anyone encountered this before? Is there a solution? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-method-ambiguous-after-upgrading-to-Java-8-tp21882.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Re: Dataframe v/s SparkSQL
They are the same. These are just different ways to construct catalyst logical plans. On Mon, Mar 2, 2015 at 12:50 PM, Manoj Samel manojsamelt...@gmail.com wrote: Is it correct to say that Spark Dataframe APIs are implemented using same execution as SparkSQL ? In other words, while the dataframe API is different than SparkSQL, the runtime performance of equivalent constructs in Dataframe and SparkSQL should be same. So one should be able to choose whichever of the two (DF v/s SQL) suite the use cases and not worry about runtime performance. Pl comment ... Thanks,
Re: JavaRDD method ambiguous after upgrading to Java 8
Seem like upgrading to 1.2.0 fixed the error. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-method-ambiguous-after-upgrading-to-Java-8-tp21882p21883.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
JavaRDD method ambiguous after upgrading to Java 8
The following method demonstrates the issue: private static Tuple2String, String group(JavaPairRDDString, String rdd, FunctionTuple2lt;String, String, String f) { return rdd.groupBy(f); } I get the following compilation error using Spark 1.1.1 and Java 8u31: The method groupBy(FunctionTuple2lt;String,String,String) is ambiguous for the type JavaPairRDDString,String I believe it is ambiguous because both JavaPairRDD and JavaRDDLike offer the same method. Has anyone encountered this before? Is there a solution? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/JavaRDD-method-ambiguous-after-upgrading-to-Java-8-tp21882.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
RDD partitions per executor in Cassandra Spark Connector
Hi all, I didn't find the *issues* button on https://github.com/datastax/spark-cassandra-connector/ so posting here. Any one have an idea why token ranges are grouped into one partition per executor? I expected at least one per core. Any suggestions on how to work around this? Doing a repartition is way to expensive as I just want more partitions for parallelism, not reshuffle ... Thanks in advance! Frens Jan
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
This is the line, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. On Monday, March 2, 2015 2:42 PM, Ted Yu yuzhih...@gmail.com wrote: bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
I ran it with the --verbose option and I see this output Using properties file: null Parsed arguments: master spark://localhost:7077 deployMode cluster executorMemory 1g executorCores null totalExecutorCores null propertiesFile null driverMemory 1g driverCores null driverExtraClassPath null driverExtraLibraryPath null driverExtraJavaOptions null supervise false queue null numExecutors 100 files null pyFiles null archives null mainClass KafkaMain primaryResource file:/Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar name KafkaMain childArgs [] jars file:/Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar verbose true Spark properties used, including those specified through --conf and those from the properties file null: Main class: org.apache.spark.deploy.Client Arguments: --memory 1g launch spark://localhost:7077 file:/Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar KafkaMain System properties: spark.executor.memory - 1g SPARK_SUBMIT - true spark.app.name - KafkaMain spark.jars - file:/Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar,file:/Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar spark.master - spark://localhost:7077 Classpath elements: On Monday, March 2, 2015 2:42 PM, Ted Yu yuzhih...@gmail.com wrote: bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Re: Executing hive query from Spark code
Here is snippet of dependency tree for spark-hive module: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.0-SNAPSHOT ... [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime ... [INFO] +- org.spark-project.hive:hive-exec:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-ant:jar:0.13.1a:compile [INFO] | | \- org.apache.velocity:velocity:jar:1.5:compile [INFO] | | \- oro:oro:jar:2.0.8:compile [INFO] | +- org.spark-project.hive:hive-common:jar:0.13.1a:compile ... [INFO] +- org.spark-project.hive:hive-serde:jar:0.13.1a:compile bq. is there a way to have the hive support without updating the assembly I don't think so. On Mon, Mar 2, 2015 at 12:37 PM, nitinkak001 nitinkak...@gmail.com wrote: I want to run Hive query inside Spark and use the RDDs generated from that inside Spark. I read in the documentation /Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive./ I just wanted to know what -Phive and -Phive-thriftserver flags really do and is there a way to have the hive support without updating the assembly. Does that flag add a hive support jar or something? The reason I am asking is that I will be using Cloudera version of Spark in future and I am not sure how to add the Hive support to that Spark distribution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executing-hive-query-from-Spark-code-tp21880.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark UI and running spark-submit with --master yarn
Hi , 1. When I run my application with --master yarn-cluster or --master yarn --deploy-mode cluster , I can not the spark UI at the location -- masternode:4040Even if I am running the job , I can not see teh SPARK UI. 2. When I run with --master yarn --deploy-mode client -- I see the Spark UI but I cannot see my job running. When I run spark-submit with --master local[*] , I see the spark UI , my job everything (Thats great) Do I need to do some settings to see the UI? Thanks -AJ
Re: Spark UI and running spark-submit with --master yarn
That's the RM's RPC port, not the web UI port. (See Ted's e-mail - normally web UI is on 8088.) On Mon, Mar 2, 2015 at 4:14 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi Marcelo, Thanks for the quick reply. I have a EMR cluster and I am running the spark-submit on the master node in the cluster. When I start the spark-submit , I see 15/03/02 23:48:33 INFO client.RMProxy: Connecting to ResourceManager at /172.31.43.254:9022 But If I try that URL or the use the external DNS ec2-52-10-234-111.us-west-2.compute.amazonaws.com:9022 it does not work What am I missing here ? Thanks a lot for the help -AJ On Mon, Mar 2, 2015 at 3:50 PM, Marcelo Vanzin van...@cloudera.com wrote: What are you calling masternode? In yarn-cluster mode, the driver is running somewhere in your cluster, not on the machine where you run spark-submit. The easiest way to get to the Spark UI when using Yarn is to use the Yarn RM's web UI. That will give you a link to the application's UI regardless of whether it's running on client or cluster mode. On Mon, Mar 2, 2015 at 3:39 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , When I run my application with --master yarn-cluster or --master yarn --deploy-mode cluster , I can not the spark UI at the location -- masternode:4040Even if I am running the job , I can not see teh SPARK UI. When I run with --master yarn --deploy-mode client -- I see the Spark UI but I cannot see my job running. When I run spark-submit with --master local[*] , I see the spark UI , my job everything (Thats great) Do I need to do some settings to see the UI? Thanks -AJ -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark UI and running spark-submit with --master yarn
Default RM Web UI port is 8088 (configurable through yarn.resourcemanager.webapp.address) Cheers On Mon, Mar 2, 2015 at 4:14 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi Marcelo, Thanks for the quick reply. I have a EMR cluster and I am running the spark-submit on the master node in the cluster. When I start the spark-submit , I see 15/03/02 23:48:33 INFO client.RMProxy: Connecting to ResourceManager at / 172.31.43.254:9022 But If I try that URL or the use the external DNS ec2-52-10-234-111.us-west-2.compute.amazonaws.com:9022 it does not work What am I missing here ? Thanks a lot for the help -AJ On Mon, Mar 2, 2015 at 3:50 PM, Marcelo Vanzin van...@cloudera.com wrote: What are you calling masternode? In yarn-cluster mode, the driver is running somewhere in your cluster, not on the machine where you run spark-submit. The easiest way to get to the Spark UI when using Yarn is to use the Yarn RM's web UI. That will give you a link to the application's UI regardless of whether it's running on client or cluster mode. On Mon, Mar 2, 2015 at 3:39 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , When I run my application with --master yarn-cluster or --master yarn --deploy-mode cluster , I can not the spark UI at the location -- masternode:4040Even if I am running the job , I can not see teh SPARK UI. When I run with --master yarn --deploy-mode client -- I see the Spark UI but I cannot see my job running. When I run spark-submit with --master local[*] , I see the spark UI , my job everything (Thats great) Do I need to do some settings to see the UI? Thanks -AJ -- Marcelo
Re: Spark UI and running spark-submit with --master yarn
What are you calling masternode? In yarn-cluster mode, the driver is running somewhere in your cluster, not on the machine where you run spark-submit. The easiest way to get to the Spark UI when using Yarn is to use the Yarn RM's web UI. That will give you a link to the application's UI regardless of whether it's running on client or cluster mode. On Mon, Mar 2, 2015 at 3:39 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , When I run my application with --master yarn-cluster or --master yarn --deploy-mode cluster , I can not the spark UI at the location -- masternode:4040Even if I am running the job , I can not see teh SPARK UI. When I run with --master yarn --deploy-mode client -- I see the Spark UI but I cannot see my job running. When I run spark-submit with --master local[*] , I see the spark UI , my job everything (Thats great) Do I need to do some settings to see the UI? Thanks -AJ -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded
Sab, not sure what you require for the similarity metric or your use case but you can also look at spark-rowsimilarity or spark-itemsimilarity (column-wise) here http://mahout.apache.org/users/recommender/intro-cooccurrence-spark.html http://mahout.apache.org/users/recommender/intro-cooccurrence-spark.html. These are optimized for LLR based “similarity” which is very simple to calculate since you don’t use either the item weight or the entire row or column vector values. Downsampling is done by number of values per column (or row) and by LLR strength. This keeps it to O(n) They run pretty fast and only use memory if you use the version that attaches application IDs to the rows and columns. Using SimilarityAnalysis.cooccurrence may help. It’s in the Spark/Scala part of Mahout. On Mar 2, 2015, at 12:56 PM, Reza Zadeh r...@databricks.com wrote: Hi Sab, The current method is optimized for having many rows and few columns. In your case it is exactly the opposite. We are working on your case, tracked by this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 https://issues.apache.org/jira/browse/SPARK-4823 Your case is very common, so I will put some time into building it. In the meantime, if you're looking for groups of similar points, consider using K-means - it will get you clusters of similar rows with euclidean distance. Best, Reza On Sun, Mar 1, 2015 at 9:36 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com mailto:sabarish.sasidha...@manthan.com wrote: Hi Reza I see that ((int, int), double) pairs are generated for any combination that meets the criteria controlled by the threshold. But assuming a simple 1x10K matrix that means I would need atleast 12GB memory per executor for the flat map just for these pairs excluding any other overhead. Is that correct? How can we make this scale for even larger n (when m stays small) like 100 x 5 million. One is by using higher thresholds. The other is that I use a SparseVector to begin with. Are there any other optimizations I can take advantage of? Thanks Sab
RE: Performance tuning in Spark SQL.
Hi, Thank you for your reply. It surely going to help. Regards, Abhishek Dubey From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, March 02, 2015 6:52 PM To: Abhishek Dubey; user@spark.apache.org Subject: RE: Performance tuning in Spark SQL. This is actually a quite open question, from my understanding, there're probably ways to tune like: *SQL Configurations like: Configuration Key Default Value spark.sql.autoBroadcastJoinThreshold 10 * 1024 * 1024 spark.sql.defaultSizeInBytes 10 * 1024 * 1024 + 1 spark.sql.planner.externalSort false spark.sql.shuffle.partitions 200 spark.sql.codegen false *Spark Cluster / Application Configuration (Memory, GC etc. Spark Core Number etc.) *Try using the Cached tables / Parquet Files as the storage. *EXPLAIN [EXTENDED] query is your best friend to tuning your SQL itself. *... And, a real use case scenario probably be more helpful in answering your question. -Original Message- From: dubey_a [mailto:abhishek.du...@xoriant.com] Sent: Monday, March 2, 2015 6:02 PM To: user@spark.apache.orgmailto:user@spark.apache.org Subject: Performance tuning in Spark SQL. What are the ways to tune query performance in Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-tp21871.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Spark UI and running spark-submit with --master yarn
Hi Marcelo, Thanks for the quick reply. I have a EMR cluster and I am running the spark-submit on the master node in the cluster. When I start the spark-submit , I see 15/03/02 23:48:33 INFO client.RMProxy: Connecting to ResourceManager at / 172.31.43.254:9022 But If I try that URL or the use the external DNS ec2-52-10-234-111.us-west-2.compute.amazonaws.com:9022 it does not work What am I missing here ? Thanks a lot for the help -AJ On Mon, Mar 2, 2015 at 3:50 PM, Marcelo Vanzin van...@cloudera.com wrote: What are you calling masternode? In yarn-cluster mode, the driver is running somewhere in your cluster, not on the machine where you run spark-submit. The easiest way to get to the Spark UI when using Yarn is to use the Yarn RM's web UI. That will give you a link to the application's UI regardless of whether it's running on client or cluster mode. On Mon, Mar 2, 2015 at 3:39 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , When I run my application with --master yarn-cluster or --master yarn --deploy-mode cluster , I can not the spark UI at the location -- masternode:4040Even if I am running the job , I can not see teh SPARK UI. When I run with --master yarn --deploy-mode client -- I see the Spark UI but I cannot see my job running. When I run spark-submit with --master local[*] , I see the spark UI , my job everything (Thats great) Do I need to do some settings to see the UI? Thanks -AJ -- Marcelo
Re: Spark UI and running spark-submit with --master yarn
That does not look like the RM UI. Please check your configuration for the port (see Ted's e-mail). On Mon, Mar 2, 2015 at 4:45 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , port 8088 does not show me anything .(can not connect) where as port ec2-52-10-234-111.us-west-2.compute.amazonaws.com:9026 shows me all the applications. Do I have to do anything for the port 8088 or whatever I am seeing at 9026 port is good .Attached is screenshot . Thanks AJ On Mon, Mar 2, 2015 at 4:24 PM, Marcelo Vanzin van...@cloudera.com wrote: That's the RM's RPC port, not the web UI port. (See Ted's e-mail - normally web UI is on 8088.) On Mon, Mar 2, 2015 at 4:14 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi Marcelo, Thanks for the quick reply. I have a EMR cluster and I am running the spark-submit on the master node in the cluster. When I start the spark-submit , I see 15/03/02 23:48:33 INFO client.RMProxy: Connecting to ResourceManager at /172.31.43.254:9022 But If I try that URL or the use the external DNS ec2-52-10-234-111.us-west-2.compute.amazonaws.com:9022 it does not work What am I missing here ? Thanks a lot for the help -AJ On Mon, Mar 2, 2015 at 3:50 PM, Marcelo Vanzin van...@cloudera.com wrote: What are you calling masternode? In yarn-cluster mode, the driver is running somewhere in your cluster, not on the machine where you run spark-submit. The easiest way to get to the Spark UI when using Yarn is to use the Yarn RM's web UI. That will give you a link to the application's UI regardless of whether it's running on client or cluster mode. On Mon, Mar 2, 2015 at 3:39 PM, Anupama Joshi anupama.jo...@gmail.com wrote: Hi , When I run my application with --master yarn-cluster or --master yarn --deploy-mode cluster , I can not the spark UI at the location -- masternode:4040Even if I am running the job , I can not see teh SPARK UI. When I run with --master yarn --deploy-mode client -- I see the Spark UI but I cannot see my job running. When I run spark-submit with --master local[*] , I see the spark UI , my job everything (Thats great) Do I need to do some settings to see the UI? Thanks -AJ -- Marcelo -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
There is no output after this line Sent from my iPhone On Mar 2, 2015, at 2:40 PM, Ted Yu yuzhih...@gmail.com wrote: bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Problems running version 1.3.0-rc1
Hi all, I have downloaded version 1.3.0-rc1 from https://github.com/apache/spark/archive/v1.3.0-rc1.zip, extracted it and built it using: mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.5.0 -DskipTests clean package It doesn't complain for any issues, but when I call sbin/start-all.sh I get on logs: 15/03/02 21:28:24 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkWorker-akka.remote.default-remote-dispatcher-6] shutting down ActorSystem [sparkWorker] java.lang.NoClassDefFoundError: L akka/event/LogSou at java.lang.Class.getDeclaredConstructors0(Native Method) at java.lang.Class.privateGetDeclaredConstructors(Class.java:2663) at java.lang.Class.getConstructor0(Class.java:3067) at java.lang.Class.getDeclaredConstructor(Class.java:2170) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$2.apply(DynamicAccess.scala:76) at scala.util.Try$.apply(Try.scala:161) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:73) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at akka.actor.ReflectiveDynamicAccess$$anonfun$createInstanceFor$3.apply(DynamicAccess.scala:84) at scala.util.Success.flatMap(Try.scala:200) at akka.actor.ReflectiveDynamicAccess.createInstanceFor(DynamicAccess.scala:84) at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:692) at akka.remote.EndpointManager$$anonfun$9.apply(Remoting.scala:684) at scala.collection.TraversableLike$WithFilter$$anonfun$map$2.apply(TraversableLike.scala:722) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$WithFilter.map(TraversableLike.scala:721) at akka.remote.EndpointManager.akka$remote$EndpointManager$$listens(Remoting.scala:684) at akka.remote.EndpointManager$$anonfun$receive$2.applyOrElse(Remoting.scala:492) at akka.actor.Actor$class.aroundReceive(Actor.scala:465) at akka.remote.EndpointManager.aroundReceive(Remoting.scala:395) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.ClassNotFoundException: L akka.event.LogSou at java.net.URLClassLoader$1.run(URLClassLoader.java:372) at java.net.URLClassLoader$1.run(URLClassLoader.java:361) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:360) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 32 more I tried to search online but couldn't find anything similar. Any ideas what could the error be? I tried compiling with java 7 and java 8 but with the same result. Thanks a lot!
RE: Is SQLContext thread-safe?
Thanks for the response. Then I have another question: when will we want to create multiple SQLContext instances from the same SparkContext? What's the benefit? -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, March 02, 2015 9:05 PM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Yes it is thread safe, at least it's supposed to be. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Monday, March 2, 2015 4:43 PM To: user Subject: Is SQLContext thread-safe? Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Is SQLContext thread-safe?
Currently, each SQLContext has its own configuration, e.g. shuffle partition number, codegen etc. and it will be shared among the multiple threads running. We actually has some internal discussions on this, probably will provide a thread local configuration in the future for a single SQLContext instance. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, March 3, 2015 7:56 AM To: Cheng, Hao; user Subject: RE: Is SQLContext thread-safe? Thanks for the response. Then I have another question: when will we want to create multiple SQLContext instances from the same SparkContext? What's the benefit? -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, March 02, 2015 9:05 PM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Yes it is thread safe, at least it's supposed to be. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Monday, March 2, 2015 4:43 PM To: user Subject: Is SQLContext thread-safe? Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
In AkkaUtils.scala: val akkaLogLifecycleEvents = conf.getBoolean(spark.akka.logLifecycleEvents, false) Can you turn on life cycle event logging to see if you would get some more clue ? Cheers On Mon, Mar 2, 2015 at 3:56 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com wrote: I see these messages now, spark.master - spark://krishs-mbp:7077 Classpath elements: Sending launch command to spark://krishs-mbp:7077 Driver successfully submitted as driver-20150302155433- ... waiting before polling master for driver state ... polling master for driver state State of driver-20150302155433- is FAILED On Monday, March 2, 2015 2:42 PM, Ted Yu yuzhih...@gmail.com wrote: bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
RE: Is SQLContext thread-safe?
Hao, thank you so much for the reply! Do you already have some JIRA for the discussion? -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, March 03, 2015 8:23 AM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Currently, each SQLContext has its own configuration, e.g. shuffle partition number, codegen etc. and it will be shared among the multiple threads running. We actually has some internal discussions on this, probably will provide a thread local configuration in the future for a single SQLContext instance. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, March 3, 2015 7:56 AM To: Cheng, Hao; user Subject: RE: Is SQLContext thread-safe? Thanks for the response. Then I have another question: when will we want to create multiple SQLContext instances from the same SparkContext? What's the benefit? -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, March 02, 2015 9:05 PM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Yes it is thread safe, at least it's supposed to be. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Monday, March 2, 2015 4:43 PM To: user Subject: Is SQLContext thread-safe? Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: throughput in the web console?
I performed repartitioning and everything went fine with respect to the number of CPU cores being used (and respective times). However, I noticed something very strange: inside a map operation I was doing a very simple calculation and always using the same dataset (small enough to be entirely processed in the same batch); then I iterated the RDDs and calculated the mean, foreachRDD(rdd = println(MEAN: + rdd.mean())). I noticed that for different numbers of partitions (for instance, 4 and 8), the result of the mean is different. Why does this happen? On Thu, Feb 26, 2015 at 7:03 PM, Tathagata Das t...@databricks.com wrote: If you have one receiver, and you are doing only map-like operaitons then the process will primarily happen on one machine. To use all the machines, either receiver in parallel with multiple receivers, or spread out the computation by explicitly repartitioning the received streams (DStream.repartition) with sufficient partitions to load balance across more machines. TD On Thu, Feb 26, 2015 at 9:52 AM, Saiph Kappa saiph.ka...@gmail.com wrote: One more question: while processing the exact same batch I noticed that giving more CPUs to the worker does not decrease the duration of the batch. I tried this with 4 and 8 CPUs. Though, I noticed that giving only 1 CPU the duration increased, but apart from that the values were pretty similar, whether I was using 4 or 6 or 8 CPUs. On Thu, Feb 26, 2015 at 5:35 PM, Saiph Kappa saiph.ka...@gmail.com wrote: By setting spark.eventLog.enabled to true it is possible to see the application UI after the application has finished its execution, however the Streaming tab is no longer visible. For measuring the duration of batches in the code I am doing something like this: «wordCharValues.foreachRDD(rdd = { val startTick = System.currentTimeMillis() val result = rdd.take(1) val timeDiff = System.currentTimeMillis() - startTick» But my quesiton is: is it possible to see the rate/throughput (records/sec) when I have a stream to process log files that appear in a folder? On Thu, Feb 26, 2015 at 1:36 AM, Tathagata Das t...@databricks.com wrote: Yes. # tuples processed in a batch = sum of all the tuples received by all the receivers. In screen shot, there was a batch with 69.9K records, and there was a batch which took 1 s 473 ms. These two batches can be the same, can be different batches. TD On Wed, Feb 25, 2015 at 10:11 AM, Josh J joshjd...@gmail.com wrote: If I'm using the kafka receiver, can I assume the number of records processed in the batch is the sum of the number of records processed by the kafka receiver? So in the screen shot attached the max rate of tuples processed in a batch is 42.7K + 27.2K = 69.9K tuples processed in a batch with a max processing time of 1 second 473 ms? On Wed, Feb 25, 2015 at 8:48 AM, Akhil Das ak...@sigmoidanalytics.com wrote: By throughput you mean Number of events processed etc? [image: Inline image 1] Streaming tab already have these statistics. Thanks Best Regards On Wed, Feb 25, 2015 at 9:59 PM, Josh J joshjd...@gmail.com wrote: On Wed, Feb 25, 2015 at 7:54 AM, Akhil Das ak...@sigmoidanalytics.com wrote: For SparkStreaming applications, there is already a tab called Streaming which displays the basic statistics. Would I just need to extend this tab to add the throughput? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Error: Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077
I see these messages now, spark.master - spark://krishs-mbp:7077 Classpath elements: Sending launch command to spark://krishs-mbp:7077 Driver successfully submitted as driver-20150302155433- ... waiting before polling master for driver state ... polling master for driver state State of driver-20150302155433- is FAILED On Monday, March 2, 2015 2:42 PM, Ted Yu yuzhih...@gmail.com wrote: bq. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 There should be some more output following the above line. Can you post them ? Cheers On Mon, Mar 2, 2015 at 2:06 PM, Krishnanand Khambadkone kkhambadk...@yahoo.com.invalid wrote: Hi, I am running spark on my mac. It is reading from a kafka topic and then writes the data to a hbase table. When I do a spark submit, I get this error, Error connecting to master spark://localhost:7077 (akka.tcp://sparkMaster@localhost:7077), exiting. Cause was: akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@localhost:7077 My submit statement looks like this, ./spark-submit --jars /Users/hadoop/kafka-0.8.1.1-src/core/build/libs/kafka_2.8.0-0.8.1.1.jar --class KafkaMain --master spark://localhost:7077 --deploy-mode cluster --num-executors 100 --driver-memory 1g --executor-memory 1g /Users/hadoop/dev/kafkahbase/kafka/src/sparkhbase.jar
Re: Problem getting program to run on 15TB input
Everything works smoothly if I do the 99%-removal filter in Hive first. So, all the baggage from garbage collection was breaking it. Is there a way to filter() out 99% of the data without having to garbage collect 99% of the RDD? On Sun, Mar 1, 2015 at 9:56 AM, Arun Luthra arun.lut...@gmail.com wrote: I tried a shorter simper version of the program, with just 1 RDD, essentially it is: sc.textFile(..., N).map().filter().map( blah = (id, 1L)).reduceByKey().saveAsTextFile(...) Here is a typical GC log trace from one of the yarn container logs: 54.040: [GC [PSYoungGen: 9176064K-28206K(10704896K)] 9176064K-28278K(35171840K), 0.0234420 secs] [Times: user=0.15 sys=0.01, real=0.02 secs] 77.864: [GC [PSYoungGen: 9204270K-150553K(10704896K)] 9204342K-150641K(35171840K), 0.0423020 secs] [Times: user=0.30 sys=0.26, real=0.04 secs] 79.485: [GC [PSYoungGen: 9326617K-333519K(10704896K)] 9326705K-333615K(35171840K), 0.0774990 secs] [Times: user=0.35 sys=1.28, real=0.08 secs] 92.974: [GC [PSYoungGen: 9509583K-193370K(10704896K)] 9509679K-193474K(35171840K), 0.0241590 secs] [Times: user=0.35 sys=0.11, real=0.02 secs] 114.842: [GC [PSYoungGen: 9369434K-123577K(10704896K)] 9369538K-123689K(35171840K), 0.0201000 secs] [Times: user=0.31 sys=0.00, real=0.02 secs] 117.277: [GC [PSYoungGen: 9299641K-135459K(11918336K)] 9299753K-135579K(36385280K), 0.0244820 secs] [Times: user=0.19 sys=0.25, real=0.02 secs] So ~9GB is getting GC'ed every few seconds. Which seems like a lot. Question: The filter() is removing 99% of the data. Does this 99% of the data get GC'ed? Now, I was able to finally get to reduceByKey() by reducing the number of executor-cores (to 2), based on suggestions at http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-OutOfMemoryError-java-lang-OutOfMemoryError-GC-overhead-limit-exceeded-td9036.html . This makes everything before reduceByKey() run pretty smoothly. I ran this with more executor-memory and less executors (most important thing was fewer executor-cores): --num-executors 150 \ --driver-memory 15g \ --executor-memory 110g \ --executor-cores 32 \ But then, reduceByKey() fails with: java.lang.OutOfMemoryError: Java heap space On Sat, Feb 28, 2015 at 12:09 PM, Arun Luthra arun.lut...@gmail.com wrote: The Spark UI names the line number and name of the operation (repartition in this case) that it is performing. Only if this information is wrong (just a possibility), could it have started groupByKey already. I will try to analyze the amount of skew in the data by using reduceByKey (or simply countByKey) which is relatively inexpensive. For the purposes of this algorithm I can simply log and remove keys with huge counts, before doing groupByKey. On Sat, Feb 28, 2015 at 11:38 AM, Aaron Davidson ilike...@gmail.com wrote: All stated symptoms are consistent with GC pressure (other nodes timeout trying to connect because of a long stop-the-world), quite possibly due to groupByKey. groupByKey is a very expensive operation as it may bring all the data for a particular partition into memory (in particular, it cannot spill values for a single key, so if you have a single very skewed key you can get behavior like this). On Sat, Feb 28, 2015 at 11:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: But groupbykey will repartition according to numer of keys as I understand how it works. How do you know that you haven't reached the groupbykey phase? Are you using a profiler or do yoi base that assumption only on logs? sob., 28 lut 2015, 8:12 PM Arun Luthra użytkownik arun.lut...@gmail.com napisał: A correction to my first post: There is also a repartition right before groupByKey to help avoid too-many-open-files error: rdd2.union(rdd1).map(...).filter(...).repartition(15000).groupByKey().map(...).flatMap(...).saveAsTextFile() On Sat, Feb 28, 2015 at 11:10 AM, Arun Luthra arun.lut...@gmail.com wrote: The job fails before getting to groupByKey. I see a lot of timeout errors in the yarn logs, like: 15/02/28 12:47:16 WARN util.AkkaUtils: Error sending message in 1 attempts akka.pattern.AskTimeoutException: Timed out and 15/02/28 12:47:49 WARN util.AkkaUtils: Error sending message in 2 attempts java.util.concurrent.TimeoutException: Futures timed out after [30 seconds] and some of these are followed by: 15/02/28 12:48:02 ERROR executor.CoarseGrainedExecutorBackend: Driver Disassociated [akka.tcp://sparkExecutor@...] - [akka.tcp://sparkDriver@...] disassociated! Shutting down. 15/02/28 12:48:02 ERROR executor.Executor: Exception in task 421027.0 in stage 1.0 (TID 336601) java.io.FileNotFoundException: /hadoop/yarn/local//spark-local-20150228123450-3a71/36/shuffle_0_421027_0 (No such file or directory) On Sat, Feb 28, 2015 at 9:33 AM, Paweł Szulc paul.sz...@gmail.com wrote: I would first check whether there is any possibility that after doing groupbykey one of the groups does not fit
Re: SparkSQL Timestamp query failure
Thank you Alessandro :) On Tue, Mar 3, 2015 at 10:03 AM, whitebread [via Apache Spark User List] ml-node+s1001560n2188...@n3.nabble.com wrote: Anu, 1) I defined my class Header as it follows: case class Header(timestamp: java.sql.Timestamp, c_ip: String, cs_username: String, s_ip: String, s_port: String, cs_method: String, cs_uri_stem: String, cs_query: String, sc_status: Int, sc_bytes: Int, cs_bytes: Int, time_taken: Int, User_Agent: String, Referrer: String) 2) Defined a function to transform date to timestamp: implicit def date2timestamp(date: java.util.Date) = new java.sql.Timestamp(date.getTime) 3) Defined the format of my timestamp val formatTime = new java.text.SimpleDateFormat(-MM-dd hh:mm:ss) 4) Finally, I was able to parse my data: val tableMod = toProcessLogs.map(_.split( )).map(p = (Header(date2timestamp(formatTime3.parse(p(0)+ +p(1))),p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9).trim.toInt, p(10).trim.toInt, p(11).trim.toInt, p(12).trim.toInt, p(13), p(14 Hope this helps, Alessandro -- If you reply to this email, your message will be added to the discussion below: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Timestamp-query-failure-tp19502p21884.html To unsubscribe from SparkSQL Timestamp query failure, click here http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_codenode=19502code=YW5hbWlrYS5ndW9wdGFAZ21haWwuY29tfDE5NTAyfDE1MjUxMDc5MQ== . NAML http://apache-spark-user-list.1001560.n3.nabble.com/template/NamlServlet.jtp?macro=macro_viewerid=instant_html%21nabble%3Aemail.namlbase=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespacebreadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Timestamp-query-failure-tp19502p21885.html Sent from the Apache Spark User List mailing list archive at Nabble.com.
Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded
Thanks Debasish, Reza and Pat. In my case, I am doing an SVD and then doing the similarities computation. So a rowSimiliarities() would be a good fit, looking forward to it. In the meanwhile I will try to see if I can further limit the number of similarities computed through some other fashion or use kmeans instead or a combination of both. I have also been looking at Mahout's similarity recommenders based on spark, but not sure if the row similarity would apply in my case as my matrix is pretty dense. Regards Sab On Tue, Mar 3, 2015 at 7:11 AM, Pat Ferrel p...@occamsmachete.com wrote: Sab, not sure what you require for the similarity metric or your use case but you can also look at spark-rowsimilarity or spark-itemsimilarity (column-wise) here http://mahout.apache.org/users/recommender/intro-cooccurrence-spark.html. These are optimized for LLR based “similarity” which is very simple to calculate since you don’t use either the item weight or the entire row or column vector values. Downsampling is done by number of values per column (or row) and by LLR strength. This keeps it to O(n) They run pretty fast and only use memory if you use the version that attaches application IDs to the rows and columns. Using SimilarityAnalysis.cooccurrence may help. It’s in the Spark/Scala part of Mahout. On Mar 2, 2015, at 12:56 PM, Reza Zadeh r...@databricks.com wrote: Hi Sab, The current method is optimized for having many rows and few columns. In your case it is exactly the opposite. We are working on your case, tracked by this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 Your case is very common, so I will put some time into building it. In the meantime, if you're looking for groups of similar points, consider using K-means - it will get you clusters of similar rows with euclidean distance. Best, Reza On Sun, Mar 1, 2015 at 9:36 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Hi Reza I see that ((int, int), double) pairs are generated for any combination that meets the criteria controlled by the threshold. But assuming a simple 1x10K matrix that means I would need atleast 12GB memory per executor for the flat map just for these pairs excluding any other overhead. Is that correct? How can we make this scale for even larger n (when m stays small) like 100 x 5 million. One is by using higher thresholds. The other is that I use a SparseVector to begin with. Are there any other optimizations I can take advantage of? Thanks Sab -- Architect - Big Data Ph: +91 99805 99458 Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan India ICT)* +++
LBGFS optimizer performace
Hi there: I'm using LBFGS optimizer to train a logistic regression model. The code I implemented follows the pattern showed in https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training data is obtained from a Spark SQL RDD. The problem I'm having is that LBFGS tries to count the elements in my RDD and that results in a OOM exception since my dataset is huge. I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) it in order to scale logistic regression. The exception I'm getting is this: 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashOuterJoin.org $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) I'm using this parameters at runtime: --num-executors 128 --executor-memory 1G --driver-memory 4G --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.storage.memoryFraction=0.2 I also persist my dataset using MEMORY_AND_DISK_SER but get the same error. I will appreciate any help on this problem. I have been trying to solve it for days and I'm running out of time and hair. Thanks Gustavo
Re: LBGFS optimizer performace
Can you try increasing your driver memory, reducing the executors and increasing the executor memory? Thanks Best Regards On Tue, Mar 3, 2015 at 10:09 AM, Gustavo Enrique Salazar Torres gsala...@ime.usp.br wrote: Hi there: I'm using LBFGS optimizer to train a logistic regression model. The code I implemented follows the pattern showed in https://spark.apache.org/docs/1.2.0/mllib-linear-methods.html but training data is obtained from a Spark SQL RDD. The problem I'm having is that LBFGS tries to count the elements in my RDD and that results in a OOM exception since my dataset is huge. I'm running on a AWS EMR cluster with 16 c3.2xlarge instances on Hadoop YARN. My dataset is about 150 GB but I sample (I take only 1% of the data) it in order to scale logistic regression. The exception I'm getting is this: 15/03/03 04:21:44 WARN scheduler.TaskSetManager: Lost task 108.0 in stage 2.0 (TID 7600, ip-10-155-20-71.ec2.internal): java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOfRange(Arrays.java:2694) at java.lang.String.init(String.java:203) at com.esotericsoftware.kryo.io.Input.readString(Input.java:448) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:157) at com.esotericsoftware.kryo.serializers.DefaultSerializers$StringSerializer.read(DefaultSerializers.java:146) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338) at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:651) at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:42) at com.twitter.chill.Tuple2Serializer.read(TupleSerializers.scala:33) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.sql.execution.joins.HashOuterJoin.org $apache$spark$sql$execution$joins$HashOuterJoin$$buildHashTable(HashOuterJoin.scala:179) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:199) at org.apache.spark.sql.execution.joins.HashOuterJoin$$anonfun$execute$1.apply(HashOuterJoin.scala:196) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) I'm using this parameters at runtime: --num-executors 128 --executor-memory 1G --driver-memory 4G --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.storage.memoryFraction=0.2 I also persist my dataset using MEMORY_AND_DISK_SER but get the same error. I will appreciate any help on this problem. I have been trying to solve it for days and I'm running out of time and hair. Thanks Gustavo
Re: Exception while select into table.
Hi, Some suggestions: 1 You should tell us the version of spark and hive you are using. 2 You shoul paste the full trace stack of the exception. In this case, I guess you have a nested directory in the path which |bak_startup_log_uid_20150227| point to. and the config field |hive.mapred.supports.subdirectories| is |false| by default. so… |if (!conf.getBoolVar(HiveConf.ConfVars.HIVE_HADOOP_SUPPORTS_SUBDIRECTORIES) item.isDir()) { throw new HiveException(checkPaths: + src.getPath() + has nested directory + itemSource); } | On 3/3/15 14:36, LinQili wrote: Hi all, I was doing select using spark sql like: insert into table startup_log_uid_20150227 select * from bak_startup_log_uid_20150227 where login_time 1425027600 Usually, it got a exception: org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2157) org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2298) org.apache.hadoop.hive.ql.metadata.Table.copyFiles(Table.java:686) org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1469) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:243) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:137) org.apache.spark.sql.execution.Command$class.execute(commands.scala:46) org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:51) org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425) org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425) org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58) org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108) org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94) com.nd.home99.LogsProcess$anonfun$main$1$anonfun$apply$1.apply(LogsProcess.scala:286) com.nd.home99.LogsProcess$anonfun$main$1$anonfun$apply$1.apply(LogsProcess.scala:83) scala.collection.immutable.List.foreach(List.scala:318) com.nd.home99.LogsProcess$anonfun$main$1.apply(LogsProcess.scala:83) com.nd.home99.LogsProcess$anonfun$main$1.apply(LogsProcess.scala:82) scala.collection.immutable.List.foreach(List.scala:318) com.nd.home99.LogsProcess$.main(LogsProcess.scala:82) com.nd.home99.LogsProcess.main(LogsProcess.scala) sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:601) org.apache.spark.deploy.yarn.ApplicationMaster$anon$2.run(ApplicationMaster.scala:427) Is there any hints about this?
Re: unsafe memory access in spark 1.2.1
Not sure, but It could be related to th netty off heap access as described here https://issues.apache.org/jira/browse/SPARK-4516, but the message was different though. Thanks Best Regards On Mon, Mar 2, 2015 at 12:51 AM, Zalzberg, Idan (Agoda) idan.zalzb...@agoda.com wrote: Thanks, We monitor disk space so I doubt that is it, but I will check again *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Sunday, March 01, 2015 11:45 PM *To:* Zalzberg, Idan (Agoda) *Cc:* user@spark.apache.org *Subject:* Re: unsafe memory access in spark 1.2.1 Google led me to: https://bugs.openjdk.java.net/browse/JDK-8040802 Not sure if the last comment there applies to your deployment. On Sun, Mar 1, 2015 at 8:32 AM, Zalzberg, Idan (Agoda) idan.zalzb...@agoda.com wrote: My run time version is: java version 1.7.0_75 OpenJDK Runtime Environment (rhel-2.5.4.0.el6_6-x86_64 u75-b13) OpenJDK 64-Bit Server VM (build 24.75-b04, mixed mode) Thanks *From:* Ted Yu [mailto:yuzhih...@gmail.com] *Sent:* Sunday, March 01, 2015 10:18 PM *To:* Zalzberg, Idan (Agoda) *Cc:* user@spark.apache.org *Subject:* Re: unsafe memory access in spark 1.2.1 What Java version are you using ? Thanks On Sun, Mar 1, 2015 at 7:03 AM, Zalzberg, Idan (Agoda) idan.zalzb...@agoda.com wrote: Hi, I am using spark 1.2.1, sometimes I get these errors sporadically: Any thought on what could be the cause? Thanks 2015-02-27 15:08:47 ERROR SparkUncaughtExceptionHandler:96 - Uncaught exception in thread Thread[Executor task launch worker-25,5,main] java.lang.InternalError: a fault occurred in a recent unsafe memory access operation in compiled Java code at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1377) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.DeserializationStream$$anon$1.getNext(Serializer.scala:133) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:350) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:365) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) -- This message is confidential and is for the sole use of the intended recipient(s). It may also be privileged or otherwise protected by copyright or other legal rules. If you have received it by mistake please let us know by reply email and delete it from your system. It is prohibited to copy this message or disclose its content to anyone. Any confidentiality or privilege is not waived or lost by any mistaken delivery or unauthorized disclosure of the message. All messages sent to and from Agoda may be monitored to ensure compliance with company policies, to protect the company's interests and to remove potential malware. Electronic messages may be intercepted, amended, lost or deleted, or contain viruses.
RE: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory
Copy those jars into the $SPARK_HOME/lib/ datanucleus-api-jdo-3.2.6.jar datanucleus-core-3.2.10.jar datanucleus-rdbms-3.2.9.jar see https://github.com/apache/spark/blob/master/bin/compute-classpath.sh#L120 -Original Message- From: fanooos [mailto:dev.fano...@gmail.com] Sent: Tuesday, March 3, 2015 2:50 PM To: user@spark.apache.org Subject: Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory I have installed a hadoop cluster (version : 2.6.0), apache spark (version : 1.2.1 preBuilt for hadoop 2.4 and later), and hive (version 1.0.0). When I try to start the spark sql thrift server I am getting the following exception. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:248) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:91) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:90) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext.init(SQLContext.scala:90) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:72) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:51) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 26 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 31 more Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339)
how to clean shuffle write each iteration
I 'm using spark als. I set the iteration number to 30. And in each iteration, tasks will produce nearly 1TB shuffle write. To my surprise, this shuffle data will not be cleaned until the total job finished, which means, I need 30TB disk to store the shuffle data. I think after each iteration, we can delete the shuffle data before current iteration, right? how to do this? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-clean-shuffle-write-each-iteration-tp21886.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark SQL Thrift Server start exception : java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory
I have installed a hadoop cluster (version : 2.6.0), apache spark (version : 1.2.1 preBuilt for hadoop 2.4 and later), and hive (version 1.0.0). When I try to start the spark sql thrift server I am getting the following exception. Exception in thread main java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:346) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:235) at org.apache.spark.sql.hive.HiveContext$$anonfun$4.apply(HiveContext.scala:231) at scala.Option.orElse(Option.scala:257) at org.apache.spark.sql.hive.HiveContext.x$3$lzycompute(HiveContext.scala:231) at org.apache.spark.sql.hive.HiveContext.x$3(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf$lzycompute(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.hiveconf(HiveContext.scala:229) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:292) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:276) at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:248) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:91) at org.apache.spark.sql.SQLContext$$anonfun$2.apply(SQLContext.scala:90) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108) at org.apache.spark.sql.SQLContext.init(SQLContext.scala:90) at org.apache.spark.sql.hive.HiveContext.init(HiveContext.scala:72) at org.apache.spark.sql.hive.thriftserver.SparkSQLEnv$.init(SparkSQLEnv.scala:51) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2$.main(HiveThriftServer2.scala:56) at org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.main(HiveThriftServer2.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.metastore.HiveMetaStoreClient at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1412) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.init(RetryingMetaStoreClient.java:62) at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72) at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453) at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465) at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:340) ... 26 more Caused by: java.lang.reflect.InvocationTargetException at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410) ... 31 more Caused by: javax.jdo.JDOFatalUserException: Class org.datanucleus.api.jdo.JDOPersistenceManagerFactory was not found. NestedThrowables: java.lang.ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808) at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701) at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:310) at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:339) at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:248) at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:223) at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73) at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133) at org.apache.hadoop.hive.metastore.RawStoreProxy.init(RawStoreProxy.java:58) at
Supporting Hive features in Spark SQL Thrift JDBC server
Hi, According to Spark SQL documentation, Spark SQL supports the vast majority of Hive features, such as User Defined Functions( UDF) , and one of these UFDs is current_date() function, which should be supported. However, i get error when I am using this UDF in my SQL query. There are couple of other UDFs which cause similar error. Am I missing something in my JDBC server ? /Shahab
Re: Architecture of Apache Spark SQL
Here's the whole tech stack around it: [image: Inline image 1] For a bit more details you can refer this slide http://www.slideshare.net/jeykottalam/spark-sqlamp-camp2014?related=1 Previous project was Shark (SQL over spark), you can read about it from here http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html Thanks Best Regards On Mon, Mar 2, 2015 at 3:18 PM, dubey_a abhishek.du...@xoriant.com wrote: What is the architecture of Apache Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Architecture-of-Apache-Spark-SQL-tp21869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Some questions after playing a little with the new ml.Pipeline.
I see, thanks for clarifying! I'd recommend following existing implementations in spark.ml transformers. You'll need to define a UDF which operates on a single Row to compute the value for the new column. You can then use the DataFrame DSL to create the new column; the DSL provides a nice syntax for what would otherwise be a SQL statement like select ... from I'm recommending looking at the existing implementation (rather than stating it here) because it changes between Spark 1.2 and 1.3. In 1.3, the DSL is much improved and makes it easier to create a new column. Joseph On Sun, Mar 1, 2015 at 1:26 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: class DeepCNNFeature extends Transformer ... { override def transform(data: DataFrame, paramMap: ParamMap): DataFrame = { // How can I do a map partition on the underlying RDD and then add the column ? } } On Sun, Mar 1, 2015 at 10:23 AM, Jaonary Rabarisoa jaon...@gmail.com wrote: Hi Joseph, Thank your for the tips. I understand what should I do when my data are represented as a RDD. The thing that I can't figure out is how to do the same thing when the data is view as a DataFrame and I need to add the result of my pretrained model as a new column in the DataFrame. Preciselly, I want to implement the following transformer : class DeepCNNFeature extends Transformer ... { } On Sun, Mar 1, 2015 at 1:32 AM, Joseph Bradley jos...@databricks.com wrote: Hi Jao, You can use external tools and libraries if they can be called from your Spark program or script (with appropriate conversion of data types, etc.). The best way to apply a pre-trained model to a dataset would be to call the model from within a closure, e.g.: myRDD.map { myDatum = preTrainedModel.predict(myDatum) } If your data is distributed in an RDD (myRDD), then the above call will distribute the computation of prediction using the pre-trained model. It will require that all of your Spark workers be able to run the preTrainedModel; that may mean installing Caffe and dependencies on all nodes in the compute cluster. For the second question, I would modify the above call as follows: myRDD.mapPartitions { myDataOnPartition = val myModel = // instantiate neural network on this partition myDataOnPartition.map { myDatum = myModel.predict(myDatum) } } I hope this helps! Joseph On Fri, Feb 27, 2015 at 10:27 PM, Jaonary Rabarisoa jaon...@gmail.com wrote: Dear all, We mainly do large scale computer vision task (image classification, retrieval, ...). The pipeline is really great stuff for that. We're trying to reproduce the tutorial given on that topic during the latest spark summit ( http://ampcamp.berkeley.edu/5/exercises/image-classification-with-pipelines.html ) using the master version of spark pipeline and dataframe. The tutorial shows different examples of feature extraction stages before running machine learning algorithms. Even the tutorial is straightforward to reproduce with this new API, we still have some questions : - Can one use external tools (e.g via pipe) as a pipeline stage ? An example of use case is to extract feature learned with convolutional neural network. In our case, this corresponds to a pre-trained neural network with Caffe library (http://caffe.berkeleyvision.org/) . - The second question is about the performance of the pipeline. Library such as Caffe processes the data in batch and instancing one Caffe network can be time consuming when this network is very deep. So, we can gain performance if we minimize the number of Caffe network creation and give data in batch to the network. In the pipeline, this corresponds to run transformers that work on a partition basis and give the whole partition to a single caffe network. How can we create such a transformer ? Best, Jao
Re: Executing hive query from Spark code
It should work in CDH without having to recompile. http://eradiating.wordpress.com/2015/02/22/getting-hivecontext-to-work-in-cdh/ --- Original Message --- From: Ted Yu yuzhih...@gmail.com Sent: March 2, 2015 1:35 PM To: nitinkak001 nitinkak...@gmail.com Cc: user user@spark.apache.org Subject: Re: Executing hive query from Spark code Here is snippet of dependency tree for spark-hive module: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.0-SNAPSHOT ... [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime ... [INFO] +- org.spark-project.hive:hive-exec:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-ant:jar:0.13.1a:compile [INFO] | | \- org.apache.velocity:velocity:jar:1.5:compile [INFO] | | \- oro:oro:jar:2.0.8:compile [INFO] | +- org.spark-project.hive:hive-common:jar:0.13.1a:compile ... [INFO] +- org.spark-project.hive:hive-serde:jar:0.13.1a:compile bq. is there a way to have the hive support without updating the assembly I don't think so. On Mon, Mar 2, 2015 at 12:37 PM, nitinkak001 nitinkak...@gmail.com wrote: I want to run Hive query inside Spark and use the RDDs generated from that inside Spark. I read in the documentation /Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive./ I just wanted to know what -Phive and -Phive-thriftserver flags really do and is there a way to have the hive support without updating the assembly. Does that flag add a hive support jar or something? The reason I am asking is that I will be using Cloudera version of Spark in future and I am not sure how to add the Hive support to that Spark distribution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executing-hive-query-from-Spark-code-tp21880.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Executing hive query from Spark code
I am not so sure how Spark SQL compiled in CDH, but if didn’t specify the –Phive and –Phive-thriftserver flags during the build, most likely it will not work if just by providing the Hive lib jars later on. For example, does the HiveContext class exist in the assembly jar? I am also quite curious with that, any hint will be appreciated. From: Felix C [mailto:felixcheun...@hotmail.com] Sent: Tuesday, March 3, 2015 12:59 PM To: Ted Yu; nitinkak001 Cc: user Subject: Re: Executing hive query from Spark code It should work in CDH without having to recompile. http://eradiating.wordpress.com/2015/02/22/getting-hivecontext-to-work-in-cdh/ --- Original Message --- From: Ted Yu yuzhih...@gmail.commailto:yuzhih...@gmail.com Sent: March 2, 2015 1:35 PM To: nitinkak001 nitinkak...@gmail.commailto:nitinkak...@gmail.com Cc: user user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Executing hive query from Spark code Here is snippet of dependency tree for spark-hive module: [INFO] org.apache.spark:spark-hive_2.10:jar:1.3.0-SNAPSHOT ... [INFO] +- org.spark-project.hive:hive-metastore:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-shims:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-common:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20:jar:0.13.1a:runtime [INFO] | | +- org.spark-project.hive.shims:hive-shims-common-secure:jar:0.13.1a:compile [INFO] | | +- org.spark-project.hive.shims:hive-shims-0.20S:jar:0.13.1a:runtime [INFO] | | \- org.spark-project.hive.shims:hive-shims-0.23:jar:0.13.1a:runtime ... [INFO] +- org.spark-project.hive:hive-exec:jar:0.13.1a:compile [INFO] | +- org.spark-project.hive:hive-ant:jar:0.13.1a:compile [INFO] | | \- org.apache.velocity:velocity:jar:1.5:compile [INFO] | | \- oro:oro:jar:2.0.8:compile [INFO] | +- org.spark-project.hive:hive-common:jar:0.13.1a:compile ... [INFO] +- org.spark-project.hive:hive-serde:jar:0.13.1a:compile bq. is there a way to have the hive support without updating the assembly I don't think so. On Mon, Mar 2, 2015 at 12:37 PM, nitinkak001 nitinkak...@gmail.commailto:nitinkak...@gmail.com wrote: I want to run Hive query inside Spark and use the RDDs generated from that inside Spark. I read in the documentation /Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive./ I just wanted to know what -Phive and -Phive-thriftserver flags really do and is there a way to have the hive support without updating the assembly. Does that flag add a hive support jar or something? The reason I am asking is that I will be using Cloudera version of Spark in future and I am not sure how to add the Hive support to that Spark distribution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executing-hive-query-from-Spark-code-tp21880.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: External Data Source in Spark
Wouldn't it be possible with .saveAsNewHadoopAPIFile? How are you pushing the filters and projections currently? Thanks Best Regards On Tue, Mar 3, 2015 at 1:11 AM, Addanki, Santosh Kumar santosh.kumar.adda...@sap.com wrote: Hi Colleagues, Currently we have implemented External Data Source API and are able to push filters and projections. Could you provide some info on how perhaps the joins could be pushed to the original Data Source if both the data sources are from same database Briefly looked at DataSourceStrategy.scala but could not get far Best Regards Santosh
Re: Architecture of Apache Spark SQL
Here is a description of the optimizer: https://docs.google.com/a/databricks.com/document/d/1Hc_Ehtr0G8SQUg69cmViZsMi55_Kf3tISD9GPGU5M1Y/edit On Mon, Mar 2, 2015 at 10:18 PM, Akhil Das ak...@sigmoidanalytics.com wrote: Here's the whole tech stack around it: [image: Inline image 1] For a bit more details you can refer this slide http://www.slideshare.net/jeykottalam/spark-sqlamp-camp2014?related=1 Previous project was Shark (SQL over spark), you can read about it from here http://databricks.com/blog/2014/07/01/shark-spark-sql-hive-on-spark-and-the-future-of-sql-on-spark.html Thanks Best Regards On Mon, Mar 2, 2015 at 3:18 PM, dubey_a abhishek.du...@xoriant.com wrote: What is the architecture of Apache Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Architecture-of-Apache-Spark-SQL-tp21869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Is SQLContext thread-safe?
https://issues.apache.org/jira/browse/SPARK-2087 https://github.com/apache/spark/pull/4382 I am working on the prototype, but will be updated soon. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, March 3, 2015 8:32 AM To: Cheng, Hao; user Subject: RE: Is SQLContext thread-safe? Hao, thank you so much for the reply! Do you already have some JIRA for the discussion? -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Tuesday, March 03, 2015 8:23 AM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Currently, each SQLContext has its own configuration, e.g. shuffle partition number, codegen etc. and it will be shared among the multiple threads running. We actually has some internal discussions on this, probably will provide a thread local configuration in the future for a single SQLContext instance. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, March 3, 2015 7:56 AM To: Cheng, Hao; user Subject: RE: Is SQLContext thread-safe? Thanks for the response. Then I have another question: when will we want to create multiple SQLContext instances from the same SparkContext? What's the benefit? -Original Message- From: Cheng, Hao [mailto:hao.ch...@intel.com] Sent: Monday, March 02, 2015 9:05 PM To: Haopu Wang; user Subject: RE: Is SQLContext thread-safe? Yes it is thread safe, at least it's supposed to be. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Monday, March 2, 2015 4:43 PM To: user Subject: Is SQLContext thread-safe? Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Exception while select into table.
Hi all,I was doing select using spark sql like: insert into table startup_log_uid_20150227select * from bak_startup_log_uid_20150227where login_time 1425027600 Usually, it got a exception: org.apache.hadoop.hive.ql.metadata.Hive.checkPaths(Hive.java:2157)org.apache.hadoop.hive.ql.metadata.Hive.copyFiles(Hive.java:2298)org.apache.hadoop.hive.ql.metadata.Table.copyFiles(Table.java:686)org.apache.hadoop.hive.ql.metadata.Hive.loadTable(Hive.java:1469)org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:243)org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:137)org.apache.spark.sql.execution.Command$class.execute(commands.scala:46)org.apache.spark.sql.hive.execution.InsertIntoHiveTable.execute(InsertIntoHiveTable.scala:51)org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:425)org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:425)org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:108)org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:94)com.nd.home99.LogsProcess$$anonfun$main$1$$anonfun$apply$1.apply(LogsProcess.scala:286)com.nd.home99.LogsProcess$$anonfun$main$1$$anonfun$apply$1.apply(LogsProcess.scala:83)scala.collection.immutable.List.foreach(List.scala:318)com.nd.home99.LogsProcess$$anonfun$main$1.apply(LogsProcess.scala:83)com.nd.home99.LogsProcess$$anonfun$main$1.apply(LogsProcess.scala:82)scala.collection.immutable.List.foreach(List.scala:318)com.nd.home99.LogsProcess$.main(LogsProcess.scala:82)com.nd.home99.LogsProcess.main(LogsProcess.scala)sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)java.lang.reflect.Method.invoke(Method.java:601)org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:427) Is there any hints about this?
Re: Performance tuning in Spark SQL.
You have sent four questions that are very general in nature. They might be better answered if you googled for those topics: there is a wealth of materials available. 2015-03-02 2:01 GMT-08:00 dubey_a abhishek.du...@xoriant.com: What are the ways to tune query performance in Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-tp21871.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Executing hive query from Spark code
I want to run Hive query inside Spark and use the RDDs generated from that inside Spark. I read in the documentation /Hive support is enabled by adding the -Phive and -Phive-thriftserver flags to Spark’s build. This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries (SerDes) in order to access data stored in Hive./ I just wanted to know what -Phive and -Phive-thriftserver flags really do and is there a way to have the hive support without updating the assembly. Does that flag add a hive support jar or something? The reason I am asking is that I will be using Cloudera version of Spark in future and I am not sure how to add the Hive support to that Spark distribution. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Executing-hive-query-from-Spark-code-tp21880.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Dataframe v/s SparkSQL
Is it correct to say that Spark Dataframe APIs are implemented using same execution as SparkSQL ? In other words, while the dataframe API is different than SparkSQL, the runtime performance of equivalent constructs in Dataframe and SparkSQL should be same. So one should be able to choose whichever of the two (DF v/s SQL) suite the use cases and not worry about runtime performance. Pl comment ... Thanks,
Re: Workaround for spark 1.2.X roaringbitmap kryo problem?
I think this is a Java vs scala syntax issue. Will check. On Thu, Feb 26, 2015 at 8:17 PM, Arun Luthra arun.lut...@gmail.com wrote: Problem is noted here: https://issues.apache.org/jira/browse/SPARK-5949 I tried this as a workaround: import org.apache.spark.scheduler._ import org.roaringbitmap._ ... kryo.register(classOf[org.roaringbitmap.RoaringBitmap]) kryo.register(classOf[org.roaringbitmap.RoaringArray]) kryo.register(classOf[org.roaringbitmap.ArrayContainer]) kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element]) kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) kryo.register(classOf[short[]]) in build file: libraryDependencies += org.roaringbitmap % RoaringBitmap % 0.4.8 This fails to compile: ...:53: identifier expected but ']' found. [error] kryo.register(classOf[org.roaringbitmap.RoaringArray$Element[]]) also: :54: identifier expected but ']' found. [error] kryo.register(classOf[short[]]) also: :51: class HighlyCompressedMapStatus in package scheduler cannot be accessed in package org.apache.spark.scheduler [error] kryo.register(classOf[org.apache.spark.scheduler.HighlyCompressedMapStatus]) Suggestions? Arun
Re: Column Similarities using DIMSUM fails with GC overhead limit exceeded
Hi Sab, The current method is optimized for having many rows and few columns. In your case it is exactly the opposite. We are working on your case, tracked by this JIRA: https://issues.apache.org/jira/browse/SPARK-4823 Your case is very common, so I will put some time into building it. In the meantime, if you're looking for groups of similar points, consider using K-means - it will get you clusters of similar rows with euclidean distance. Best, Reza On Sun, Mar 1, 2015 at 9:36 PM, Sabarish Sasidharan sabarish.sasidha...@manthan.com wrote: Hi Reza I see that ((int, int), double) pairs are generated for any combination that meets the criteria controlled by the threshold. But assuming a simple 1x10K matrix that means I would need atleast 12GB memory per executor for the flat map just for these pairs excluding any other overhead. Is that correct? How can we make this scale for even larger n (when m stays small) like 100 x 5 million. One is by using higher thresholds. The other is that I use a SparseVector to begin with. Are there any other optimizations I can take advantage of? Thanks Sab
Re: Store DStreams into Hive using Hive Streaming
please if you have found a solution for this , could you please post it ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Store-DStreams-into-Hive-using-Hive-Streaming-tp18307p21877.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple sparkcontexts and streamingcontexts
thanks for the reply. Actually, our main problem is not really about sparkcontext, the problem is that spark does not allow to create streaming context dynamically, and once a stream is shut down, a new one cannot be created in the same sparkcontext. So we cannot create a service that would create and manage multiple streams - the same way that is possible with batch jobs. On Mon, Mar 2, 2015 at 2:52 PM, Sean Owen so...@cloudera.com wrote: I think everything there is to know about it is on JIRA; I don't think that's being worked on. On Mon, Mar 2, 2015 at 2:50 PM, Tamas Jambor jambo...@gmail.com wrote: I have seen there is a card (SPARK-2243) to enable that. Is that still going ahead? On Mon, Mar 2, 2015 at 2:46 PM, Sean Owen so...@cloudera.com wrote: It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Performance tuning in Spark SQL.
This is actually a quite open question, from my understanding, there're probably ways to tune like: *SQL Configurations like: Configuration Key Default Value spark.sql.autoBroadcastJoinThreshold 10 * 1024 * 1024 spark.sql.defaultSizeInBytes 10 * 1024 * 1024 + 1 spark.sql.planner.externalSort false spark.sql.shuffle.partitions 200 spark.sql.codegen false *Spark Cluster / Application Configuration (Memory, GC etc. Spark Core Number etc.) *Try using the Cached tables / Parquet Files as the storage. *EXPLAIN [EXTENDED] query is your best friend to tuning your SQL itself. *... And, a real use case scenario probably be more helpful in answering your question. -Original Message- From: dubey_a [mailto:abhishek.du...@xoriant.com] Sent: Monday, March 2, 2015 6:02 PM To: user@spark.apache.org Subject: Performance tuning in Spark SQL. What are the ways to tune query performance in Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-tp21871.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: multiple sparkcontexts and streamingcontexts
I think everything there is to know about it is on JIRA; I don't think that's being worked on. On Mon, Mar 2, 2015 at 2:50 PM, Tamas Jambor jambo...@gmail.com wrote: I have seen there is a card (SPARK-2243) to enable that. Is that still going ahead? On Mon, Mar 2, 2015 at 2:46 PM, Sean Owen so...@cloudera.com wrote: It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple sparkcontexts and streamingcontexts
It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple sparkcontexts and streamingcontexts
I have seen there is a card (SPARK-2243) to enable that. Is that still going ahead? On Mon, Mar 2, 2015 at 2:46 PM, Sean Owen so...@cloudera.com wrote: It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Performance tuning in Spark SQL.
What are the ways to tune query performance in Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Performance-tuning-in-Spark-SQL-tp21871.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
SQL Queries running on Schema RDD's in Spark SQL
How does the SQL queries really break down across nodes and run on Schema RDD's in background? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SQL-Queries-running-on-Schema-RDD-s-in-Spark-SQL-tp21870.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Number of Executors per worker process
Hello! Thank you very much for your response. In the book Learning Spark I found out the following sentence: Each application will have at most one executor on each worker So worker can have one or none executor process spawned (perhaps the number depends on the workload distribution). Best regards, Florin On Thu, Feb 26, 2015 at 1:04 PM, Jeffrey Jedele jeffrey.jed...@gmail.com wrote: Hi Spico, Yes, I think an executor core in Spark is basically a thread in a worker pool. It's recommended to have one executor core per physical core on your machine for best performance, but I think in theory you can create as many threads as your OS allows. For deployment: There seems to be the actual worker JVM which coordinates the work on a worker node. I don't think the actual thread pool lives in there, but a separate JVM is created for each application that has cores allocated on the node. Otherwise it would be rather hard to impose memory limits on application level and it would have serious disadvantages regarding stability. You can check this behavior by looing at the processes on your machine: ps aux | grep spark.deploy = will show master, worker (coordinator) and driver JVMs ps aux | grep spark.executor = will show the actual worker JVMs 2015-02-25 14:23 GMT+01:00 Spico Florin spicoflo...@gmail.com: Hello! I've read the documentation about the spark architecture, I have the following questions: 1: how many executors can be on a single worker process (JMV)? 2:Should I think executor like a Java Thread Executor where the pool size is equal with the number of the given cores (set up by the SPARK_WORKER_CORES)? 3. If the worker can have many executors, how this is handled by the Spark? Or can I handle by myself to set up the number of executors per JVM? I look forward for your answers. Regards, Florin
RE: Is SQLContext thread-safe?
Yes it is thread safe, at least it's supposed to be. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Monday, March 2, 2015 4:43 PM To: user Subject: Is SQLContext thread-safe? Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is SQLContext thread-safe?
Hi, is it safe to use the same SQLContext to do Select operations in different threads at the same time? Thank you very much! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Combiners in Spark
Which is the equivalent function to Combiners of MapReduce in Spark? I guess that it's combineByKey, but is combineByKey executed locally? I understand than functions as reduceByKey or foldByKey aren't executed locally. Reading the documentation looks like combineByKey is equivalent to reduceByKey just that combineByKey you can specify an different output than the input you have. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: documentation - graphx-programming-guide error?
You are correct in that the type of messages being sent in that example is String and so reduceFun must operate on String. Being just an example, it can do any reasonable combining of messages. How about a + + b? Or the message could be changed to an Int. The mapReduceTriplets example above appears to have the same problem. I think it's worth opening a PR + JIRA for the fix. On Mon, Mar 2, 2015 at 7:12 AM, Deborah Siegel deborah.sie...@gmail.com wrote: Hello, I am running through examples given on http://spark.apache.org/docs/1.2.1/graphx-programming-guide.html The section for Map Reduce Triplets Transition Guide (Legacy) indicates that one can run the following .aggregateMessages code val graph: Graph[Int, Float] = ... def msgFun(triplet: EdgeContext[Int, Float, String]) { triplet.sendToDst(Hi) } def reduceFun(a: Int, b: Int): Int = a + b val result = graph.aggregateMessages[String](msgFun, reduceFun) I created a graph of the indicated type, and get an error scala val result = graph.aggregateMessages[String](msgFun, reduceFun) console:23: error: type mismatch; found : Int required: String Error occurred in an application involving default arguments. val result = graph.aggregateMessages[String](msgFun, reduceFun) ^ What is this example supposed to do? The following would work, although I'll admit I am perplexed by the example's intent. def msgFun(triplet: EdgeContext[Int, Float, (Int,String)]) { triplet.sendToDst(1, Hi) } def reduceFun(a: (Int,String), b: (Int,String)): (Int,String) = ((a._1 + b._1),a._2) val result = graph.aggregateMessages[(Int,String)](msgFun, reduceFun) Sincerely, Deb - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: SparkSQL Timestamp query failure
Can you please post how did you overcome this issue. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-Timestamp-query-failure-tp19502p21868.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Best practices for query creation in Spark SQL.
Are there any best practices for schema design and query creation in Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Best-practices-for-query-creation-in-Spark-SQL-tp21872.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Connection pool in workers
Thanks Chris, That is what I wanted to know :) A.K.M. Ashrafuzzaman Lead Software Engineer NewsCred (M) 880-175-5592433 Twitter | Blog | Facebook Check out The Academy, your #1 source for free content marketing resources On Mar 2, 2015, at 2:04 AM, Chris Fregly ch...@fregly.com wrote: hey AKM! this is a very common problem. the streaming programming guide addresses this issue here, actually: http://spark.apache.org/docs/1.2.0/streaming-programming-guide.html#design-patterns-for-using-foreachrdd the tl;dr is this: 1) you want to use foreachPartition() to operate on a whole partition versus a single record with foreachRDD() 2) you want to get/release the ConnectionPool within each worker 3) make sure you initialize the ConnectionPool first - or do it lazily upon getting the first connection. here's the sample code referenced in the link above with some additional comments for clarity: dstream.foreachRDD { rdd = // everything within here runs on the Driver rdd.foreachPartition { partitionOfRecords = // everything within here runs on the Worker and operates on a partition of records // ConnectionPool is a static, lazily initialized singleton pool of connections that runs within the Worker JVM // retrieve a connection from the pool val connection = ConnectionPool.getConnection() // perform the application logic here - parse and write to mongodb using the connection partitionOfRecords.foreach(record = connection.send(record)) // return to the pool for future reuse ConnectionPool.returnConnection(connection) } } hope that helps! -chris On Sun, Mar 1, 2015 at 4:00 AM, A.K.M. Ashrafuzzaman ashrafuzzaman...@gmail.com wrote: Sorry guys may bad, Here is a high level code sample, val unionStreams = ssc.union(kinesisStreams) unionStreams.foreachRDD(rdd = { rdd.foreach(tweet = val strTweet = new String(tweet, UTF-8) val interaction = InteractionParser.parser(strTweet) interactionDAL.insert(interaction) ) }) Here I have to close the connection for interactionDAL other wise the JVM gives me error that the connection is open. I tried with sticky connection as well with keep_alive true. So my guess was that at the point of “unionStreams.foreachRDD” or at “rdd.foreach” the code is marshaled and send to workers and workers un-marshals and execute the process, which is why the connection is alway opened for each RDD. I might be completely wrong. I would love to know what is going on underneath. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Architecture of Apache Spark SQL
What is the architecture of Apache Spark SQL? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Architecture-of-Apache-Spark-SQL-tp21869.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Best practices for query creation in Spark SQL.
Hi, I think your chances for a satisfying answer would increase dramatically if you elaborated a bit more on what you actually want to know. (Holds for any of your last four questions about Spark SQL...) Tobias
GraphX path traversal
Hi, I have a below edge list. How to find the parents path for every vertex? Example : Vertex 1 path : 2, 3, 4, 5, 6 Vertex 2 path : 3, 4, 5, 6 Vertex 3 path : 4,5,6 vertex 4 path : 5,6 vertex 5 path : 6 Could you please let me know how to do this? (or) Any suggestion Source Vertex Destination Vertex 1 2 2 3 3 4 4 5 5 6 Regards, Rajesh
Re: Combiners in Spark
I think the simplest answer is that it's not really a separate concept from the 'reduce' function, because Spark's API is a sort of simpler, purer form of FP. It is just the same function that can be applied at many points in an aggregation -- both map side (a la Combiners in MapReduce) or reduce side (a la Reducers in MapReduce). In MapReduce even, you could often use the same function for combine and reduce. They were separate mostly because the semantics for what happened in Reducer were different; this would map to more than just reduceByKey in Spark. These various ByKey operations build on combineByKey, yes. Despite its name, mergeCombiners is not only a Combiner-style function. It's the reduce function, applied in several places. You can control whether it is applied map-side or not, but it is by default. So combiners are pretty automatic in Spark. On Mon, Mar 2, 2015 at 10:55 AM, Guillermo Ortiz konstt2...@gmail.com wrote: Which is the equivalent function to Combiners of MapReduce in Spark? I guess that it's combineByKey, but is combineByKey executed locally? I understand than functions as reduceByKey or foldByKey aren't executed locally. Reading the documentation looks like combineByKey is equivalent to reduceByKey just that combineByKey you can specify an different output than the input you have. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple sparkcontexts and streamingcontexts
there are some “hidden” APIs potentially addressing your problem (but with a bit complexity) by using the Actor Receiver, you can tell the supervisor of the actor receiver create another actor receiver for you, the ActorRef of the newly created Actor will be sent to the caller of the API (in most of cases, that’s one of the existing actor receivers) The limitation might be that, all receivers are on the same machine... Here is a PR trying to expose the APIs to the user: https://github.com/apache/spark/pull/3984 Best, -- Nan Zhu http://codingcat.me On Monday, March 2, 2015 at 10:19 AM, Tamas Jambor wrote: Sorry, I meant once the stream is started, it's not possible to create new streams in the existing streaming context, and it's not possible to create new streaming context if another one is already running. So the only feasible option seemed to create new sparkcontexts for each stream (tried using spark-jobserver for that). On Mon, Mar 2, 2015 at 3:07 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: You can make a new StreamingContext on an existing SparkContext, I believe? On Mon, Mar 2, 2015 at 3:01 PM, Tamas Jambor jambo...@gmail.com (mailto:jambo...@gmail.com) wrote: thanks for the reply. Actually, our main problem is not really about sparkcontext, the problem is that spark does not allow to create streaming context dynamically, and once a stream is shut down, a new one cannot be created in the same sparkcontext. So we cannot create a service that would create and manage multiple streams - the same way that is possible with batch jobs. On Mon, Mar 2, 2015 at 2:52 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: I think everything there is to know about it is on JIRA; I don't think that's being worked on. On Mon, Mar 2, 2015 at 2:50 PM, Tamas Jambor jambo...@gmail.com (mailto:jambo...@gmail.com) wrote: I have seen there is a card (SPARK-2243) to enable that. Is that still going ahead? On Mon, Mar 2, 2015 at 2:46 PM, Sean Owen so...@cloudera.com (mailto:so...@cloudera.com) wrote: It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com (mailto:jambo...@gmail.com) wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com (http://Nabble.com). - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org (mailto:user-unsubscr...@spark.apache.org) For additional commands, e-mail: user-h...@spark.apache.org (mailto:user-h...@spark.apache.org)
Re: Scalable JDBCRDD
Have you already tried using the Vertica hadoop input format with spark? I don't know how it's implemented, but I'd hope that it has some notion of vertica-specific shard locality (which JdbcRDD does not). If you're really constrained to consuming the result set in a single thread, whatever processing you're doing of the results must be time-consuming enough to make the overhead of distributing it in a spark job still worthwhile? I guess you might take a look at doing a custom DStream receiver that iterates over the result set and makes micro-batches out of it. On Sun, Mar 1, 2015 at 9:59 AM, michal.klo...@gmail.com michal.klo...@gmail.com wrote: Yes exactly. The temp table is an approach but then we need to manage the deletion of it etc. I'm sure we won't be the only people with this crazy use case. If there isn't a feasible way to do this within the framework then that's okay. But if there is a way we are happy to write the code and PR it back :) M On Mar 1, 2015, at 10:02 AM, eric e...@ericjbell.com wrote: What you're saying is that, due to the intensity of the query, you need to run a single query and partition the results, versus running one query for each partition. I assume it's not viable to throw the query results into another table in your database and then query that using the normal approach? --eric On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote: Jorn: Vertica Cody: I posited the limit just as an example of how jdbcrdd could be used least invasively. Let's say we used a partition on a time field -- we would still need to have N executions of those queries. The queries we have are very intense and concurrency is an issue even if the the N partitioned queries are smaller. Some queries require evaluating the whole data set first. If our use case a simple select * from table.. Then the partitions would be an easier sell if it wasn't for the concurrency problem :) Long story short -- we need only one execution of the query and would like to just divy out the result set. M On Mar 1, 2015, at 5:18 AM, Jörn Franke jornfra...@gmail.com wrote: What database are you using? Le 28 févr. 2015 18:15, Michal Klos michal.klo...@gmail.com a écrit : Hi Spark community, We have a use case where we need to pull huge amounts of data from a SQL query against a database into Spark. We need to execute the query against our huge database and not a substitute (SparkSQL, Hive, etc) because of a couple of factors including custom functions used in the queries that only our database has. We started by looking at JDBC RDD, which utilizes a prepared statement with two parameters that are meant to be used to partition the result set to the workers... e.g.: select * from table limit ?,? turns into select * from table limit 1,100 on worker 1 select * from table limit 101,200 on worker 2 This will not work for us because our database cannot support multiple execution of these queries without being crippled. But, additionally, our database doesn't support the above LIMIT syntax and we don't have a generic way of partitioning the various queries. As a result -- we stated by forking JDBCRDD and made a version that executes the SQL query once in getPartitions into a Vector and then hands each worker node an index and iterator. Here's a snippet of getPartitions and compute: override def getPartitions: Array[Partition] = { //Compute the DB query once here val results = computeQuery (0 until numPartitions).map(i = { // TODO: would be better to do this partitioning when scrolling through result set if still loading into memory val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector new DBPartition(i, partitionItems) }).toArray } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { val part = thePart.asInstanceOf[DBPartition[T]] //Shift the result vector to our index number and then do a sliding iterator over it val iterator = part.items.iterator override def getNext : T = { if (iterator.hasNext) { iterator.next() } else { finished = true null.asInstanceOf[T] } } override def close: Unit = () } This is a little better since we can just execute the query once. However, the result-set needs to fit in memory. We've been trying to brainstorm a way to A) have that result set distribute out to the worker RDD partitions as it's streaming in from the cursor? B) have the result set spill to disk if it exceeds memory and do something clever around the iterators? C) something else? We're not familiar enough yet with all of the workings of Spark to know how to proceed on this. We also thought of the worker-around of having the DB query dump to HDFS/S3 and then pick it up for there, but it adds more moving parts and latency to our
Re: multiple sparkcontexts and streamingcontexts
You can make a new StreamingContext on an existing SparkContext, I believe? On Mon, Mar 2, 2015 at 3:01 PM, Tamas Jambor jambo...@gmail.com wrote: thanks for the reply. Actually, our main problem is not really about sparkcontext, the problem is that spark does not allow to create streaming context dynamically, and once a stream is shut down, a new one cannot be created in the same sparkcontext. So we cannot create a service that would create and manage multiple streams - the same way that is possible with batch jobs. On Mon, Mar 2, 2015 at 2:52 PM, Sean Owen so...@cloudera.com wrote: I think everything there is to know about it is on JIRA; I don't think that's being worked on. On Mon, Mar 2, 2015 at 2:50 PM, Tamas Jambor jambo...@gmail.com wrote: I have seen there is a card (SPARK-2243) to enable that. Is that still going ahead? On Mon, Mar 2, 2015 at 2:46 PM, Sean Owen so...@cloudera.com wrote: It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: multiple sparkcontexts and streamingcontexts
Sorry, I meant once the stream is started, it's not possible to create new streams in the existing streaming context, and it's not possible to create new streaming context if another one is already running. So the only feasible option seemed to create new sparkcontexts for each stream (tried using spark-jobserver for that). On Mon, Mar 2, 2015 at 3:07 PM, Sean Owen so...@cloudera.com wrote: You can make a new StreamingContext on an existing SparkContext, I believe? On Mon, Mar 2, 2015 at 3:01 PM, Tamas Jambor jambo...@gmail.com wrote: thanks for the reply. Actually, our main problem is not really about sparkcontext, the problem is that spark does not allow to create streaming context dynamically, and once a stream is shut down, a new one cannot be created in the same sparkcontext. So we cannot create a service that would create and manage multiple streams - the same way that is possible with batch jobs. On Mon, Mar 2, 2015 at 2:52 PM, Sean Owen so...@cloudera.com wrote: I think everything there is to know about it is on JIRA; I don't think that's being worked on. On Mon, Mar 2, 2015 at 2:50 PM, Tamas Jambor jambo...@gmail.com wrote: I have seen there is a card (SPARK-2243) to enable that. Is that still going ahead? On Mon, Mar 2, 2015 at 2:46 PM, Sean Owen so...@cloudera.com wrote: It is still not something you're supposed to do; in fact there is a setting (disabled by default) that throws an exception if you try to make multiple contexts. On Mon, Mar 2, 2015 at 2:43 PM, jamborta jambo...@gmail.com wrote: hi all, what is the current status and direction on enabling multiple sparkcontexts and streamingcontext? I have seen a few issues open on JIRA, which seem to be there for quite a while. thanks, -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/multiple-sparkcontexts-and-streamingcontexts-tp21876.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Scalable JDBCRDD
Hi Cody, Thanks for the reply. Yea, we thought of possibly doing this in a UDX in Vertica somehow to get the lower level co-operation but its a bit daunting. We want to do this because there are things we want to do with the result-set in Spark that are not possible in Vertica. The DStream receiver is a good thought. I think right now, we are learning towards eric's suggestion -- where we run the big query once somewhere (getPartitions maybe) in Vertica and dumps into a temporary table with an additional generated partition_key field. Then we have the workers issue N select * from temp-table where partition_key = ? queries that are hopefully lightweight. The temporary table we are hoping will just clean itself up so we don't need to handle that mess. It seems like the most sane approach today ;] m On Mon, Mar 2, 2015 at 10:51 AM, Cody Koeninger c...@koeninger.org wrote: Have you already tried using the Vertica hadoop input format with spark? I don't know how it's implemented, but I'd hope that it has some notion of vertica-specific shard locality (which JdbcRDD does not). If you're really constrained to consuming the result set in a single thread, whatever processing you're doing of the results must be time-consuming enough to make the overhead of distributing it in a spark job still worthwhile? I guess you might take a look at doing a custom DStream receiver that iterates over the result set and makes micro-batches out of it. On Sun, Mar 1, 2015 at 9:59 AM, michal.klo...@gmail.com michal.klo...@gmail.com wrote: Yes exactly. The temp table is an approach but then we need to manage the deletion of it etc. I'm sure we won't be the only people with this crazy use case. If there isn't a feasible way to do this within the framework then that's okay. But if there is a way we are happy to write the code and PR it back :) M On Mar 1, 2015, at 10:02 AM, eric e...@ericjbell.com wrote: What you're saying is that, due to the intensity of the query, you need to run a single query and partition the results, versus running one query for each partition. I assume it's not viable to throw the query results into another table in your database and then query that using the normal approach? --eric On 3/1/15 4:28 AM, michal.klo...@gmail.com wrote: Jorn: Vertica Cody: I posited the limit just as an example of how jdbcrdd could be used least invasively. Let's say we used a partition on a time field -- we would still need to have N executions of those queries. The queries we have are very intense and concurrency is an issue even if the the N partitioned queries are smaller. Some queries require evaluating the whole data set first. If our use case a simple select * from table.. Then the partitions would be an easier sell if it wasn't for the concurrency problem :) Long story short -- we need only one execution of the query and would like to just divy out the result set. M On Mar 1, 2015, at 5:18 AM, Jörn Franke jornfra...@gmail.com wrote: What database are you using? Le 28 févr. 2015 18:15, Michal Klos michal.klo...@gmail.com a écrit : Hi Spark community, We have a use case where we need to pull huge amounts of data from a SQL query against a database into Spark. We need to execute the query against our huge database and not a substitute (SparkSQL, Hive, etc) because of a couple of factors including custom functions used in the queries that only our database has. We started by looking at JDBC RDD, which utilizes a prepared statement with two parameters that are meant to be used to partition the result set to the workers... e.g.: select * from table limit ?,? turns into select * from table limit 1,100 on worker 1 select * from table limit 101,200 on worker 2 This will not work for us because our database cannot support multiple execution of these queries without being crippled. But, additionally, our database doesn't support the above LIMIT syntax and we don't have a generic way of partitioning the various queries. As a result -- we stated by forking JDBCRDD and made a version that executes the SQL query once in getPartitions into a Vector and then hands each worker node an index and iterator. Here's a snippet of getPartitions and compute: override def getPartitions: Array[Partition] = { //Compute the DB query once here val results = computeQuery (0 until numPartitions).map(i = { // TODO: would be better to do this partitioning when scrolling through result set if still loading into memory val partitionItems = results.drop(i).sliding(1, numPartitions).flatten.toVector new DBPartition(i, partitionItems) }).toArray } override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] { val part = thePart.asInstanceOf[DBPartition[T]] //Shift the result vector to our index number and then do a sliding iterator over it val iterator =
Re: java.util.NoSuchElementException: key not found:
aha ok, thanks. If I create different RDDs from a parent RDD and force evaluation thread-by-thread, then it should presumably be fine, correct? Or do I need to checkpoint the child RDDs as a precaution in case it needs to be removed from memory and recomputed? On Sat, Feb 28, 2015 at 4:28 AM, Shixiong Zhu zsxw...@gmail.com wrote: RDD is not thread-safe. You should not use it in multiple threads. Best Regards, Shixiong Zhu 2015-02-27 23:14 GMT+08:00 rok rokros...@gmail.com: I'm seeing this java.util.NoSuchElementException: key not found: exception pop up sometimes when I run operations on an RDD from multiple threads in a python application. It ends up shutting down the SparkContext so I'm assuming this is a bug -- from what I understand, I should be able to run operations on the same RDD from multiple threads or is this not recommended? I can't reproduce it all the time and I've tried eliminating caching wherever possible to see if that would have an effect, but it doesn't seem to. Each thread first splits the base RDD and then runs the LogisticRegressionWithSGD on the subset. Is there a workaround to this exception? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/java-util-NoSuchElementException-key-not-found-tp21848.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: bitten by spark.yarn.executor.memoryOverhead
bq. that 0.1 is always enough? The answer is: it depends (on use cases). The value of 0.1 has been validated by several users. I think it is a reasonable default. Cheers On Mon, Mar 2, 2015 at 8:36 AM, Ryan Williams ryan.blake.willi...@gmail.com wrote: For reference, the initial version of #3525 https://github.com/apache/spark/pull/3525 (still open) made this fraction a configurable value, but consensus went against that being desirable so I removed it and marked SPARK-4665 https://issues.apache.org/jira/browse/SPARK-4665 as won't fix. My team wasted a lot of time on this failure mode as well and has settled in to passing --conf spark.yarn.executor.memoryOverhead=1024 to most jobs (that works out to 10-20% of --executor-memory, depending on the job). I agree that learning about this the hard way is a weak part of the Spark-on-YARN onboarding experience. The fact that our instinct here is to increase the 0.07 minimum instead of the alternate 384MB https://github.com/apache/spark/blob/3efd8bb6cf139ce094ff631c7a9c1eb93fdcd566/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala#L93 minimum seems like evidence that the fraction is the thing we should allow people to configure, instead of absolute amount that is currently configurable. Finally, do we feel confident that 0.1 is always enough? On Sat, Feb 28, 2015 at 4:51 PM Corey Nolet cjno...@gmail.com wrote: Thanks for taking this on Ted! On Sat, Feb 28, 2015 at 4:17 PM, Ted Yu yuzhih...@gmail.com wrote: I have created SPARK-6085 with pull request: https://github.com/apache/spark/pull/4836 Cheers On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: +1 to a better default as well. We were working find until we ran against a real dataset which was much larger than the test dataset we were using locally. It took me a couple days and digging through many logs to figure out this value was what was causing the problem. On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote: Having good out-of-box experience is desirable. +1 on increasing the default. On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote: There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Upgrade to Spark 1.2.1 using Guava
Marcelo’s work-around works. So if you are using the itemsimilarity stuff, the CLI has a way to solve the class not found and I can point out how to do the equivalent if you are using the library API. Ping me if you care. On Feb 28, 2015, at 2:27 PM, Erlend Hamnaberg erl...@hamnaberg.net wrote: Yes. I ran into this problem with mahout snapshot and spark 1.2.0 not really trying to figure out why that was a problem, since there were already too many moving parts in my app. Obviously there is a classpath issue somewhere. /Erlend On 27 Feb 2015 22:30, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: @Erlend hah, we were trying to merge your PR and ran into this—small world. You actually compile the JavaSerializer source in your project? @Marcelo do you mean by modifying spark.executor.extraClassPath on all workers, that didn’t seem to work? On Feb 27, 2015, at 1:23 PM, Erlend Hamnaberg erl...@hamnaberg.net mailto:erl...@hamnaberg.net wrote: Hi. I have had a simliar issue. I had to pull the JavaSerializer source into my own project, just so I got the classloading of this class under control. This must be a class loader issue with spark. -E On Fri, Feb 27, 2015 at 8:52 PM, Pat Ferrel p...@occamsmachete.com mailto:p...@occamsmachete.com wrote: I understand that I need to supply Guava to Spark. The HashBiMap is created in the client and broadcast to the workers. So it is needed in both. To achieve this there is a deps.jar with Guava (and Scopt but that is only for the client). Scopt is found so I know the jar is fine for the client. I pass in the deps.jar to the context creation code. I’ve checked the content of the jar and have verified that it is used at context creation time. I register the serializer as follows: class MyKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo) = { val h: HashBiMap[String, Int] = HashBiMap.create[String, Int]() //kryo.addDefaultSerializer(h.getClass, new JavaSerializer()) log.info http://log.info/(\n\n\nRegister Serializer for + h.getClass.getCanonicalName + \n\n\n) // just to be sure this does indeed get logged kryo.register(classOf[com.google.common.collect.HashBiMap[String, Int]], new JavaSerializer()) } } The job proceeds up until the broadcast value, a HashBiMap, is deserialized, which is where I get the following error. Have I missed a step for deserialization of broadcast values? Odd that serialization happened but deserialization failed. I’m running on a standalone localhost-only cluster. 15/02/27 11:40:34 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 4.0 (TID 9, 192.168.0.2): java.io.IOException: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1093) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:95) at my.TDIndexedDatasetReader$$anonfun$5.apply(TextDelimitedReaderWriter.scala:94) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:366) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: com.esotericsoftware.kryo.KryoException: Error during Java deserialization. at com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:732) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:144) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216) at
Re: bitten by spark.yarn.executor.memoryOverhead
The problem is, you're left with two competing options then. You can go through the process of deprecating the absolute one and removing it eventually. You take away ability to set this value directly though, meaning you'd have to set absolute values by depending on a % of what you set your app memory too. I think there's non-trivial downside that way too. No value can always be right, or else it wouldn't be configurable. I think of this one like any other param that's set in absolute terms, but with an attempt to be smart about the default. On Mon, Mar 2, 2015 at 4:36 PM, Ryan Williams ryan.blake.willi...@gmail.com wrote: For reference, the initial version of #3525 (still open) made this fraction a configurable value, but consensus went against that being desirable so I removed it and marked SPARK-4665 as won't fix. My team wasted a lot of time on this failure mode as well and has settled in to passing --conf spark.yarn.executor.memoryOverhead=1024 to most jobs (that works out to 10-20% of --executor-memory, depending on the job). I agree that learning about this the hard way is a weak part of the Spark-on-YARN onboarding experience. The fact that our instinct here is to increase the 0.07 minimum instead of the alternate 384MB minimum seems like evidence that the fraction is the thing we should allow people to configure, instead of absolute amount that is currently configurable. Finally, do we feel confident that 0.1 is always enough? On Sat, Feb 28, 2015 at 4:51 PM Corey Nolet cjno...@gmail.com wrote: Thanks for taking this on Ted! On Sat, Feb 28, 2015 at 4:17 PM, Ted Yu yuzhih...@gmail.com wrote: I have created SPARK-6085 with pull request: https://github.com/apache/spark/pull/4836 Cheers On Sat, Feb 28, 2015 at 12:08 PM, Corey Nolet cjno...@gmail.com wrote: +1 to a better default as well. We were working find until we ran against a real dataset which was much larger than the test dataset we were using locally. It took me a couple days and digging through many logs to figure out this value was what was causing the problem. On Sat, Feb 28, 2015 at 11:38 AM, Ted Yu yuzhih...@gmail.com wrote: Having good out-of-box experience is desirable. +1 on increasing the default. On Sat, Feb 28, 2015 at 8:27 AM, Sean Owen so...@cloudera.com wrote: There was a recent discussion about whether to increase or indeed make configurable this kind of default fraction. I believe the suggestion there too was that 9-10% is a safer default. Advanced users can lower the resulting overhead value; it may still have to be increased in some cases, but a fatter default may make this kind of surprise less frequent. I'd support increasing the default; any other thoughts? On Sat, Feb 28, 2015 at 3:34 PM, Koert Kuipers ko...@tresata.com wrote: hey, running my first map-red like (meaning disk-to-disk, avoiding in memory RDDs) computation in spark on yarn i immediately got bitten by a too low spark.yarn.executor.memoryOverhead. however it took me about an hour to find out this was the cause. at first i observed failing shuffles leading to restarting of tasks, then i realized this was because executors could not be reached, then i noticed in containers got shut down and reallocated in resourcemanager logs (no mention of errors, it seemed the containers finished their business and shut down successfully), and finally i found the reason in nodemanager logs. i dont think this is a pleasent first experience. i realize spark.yarn.executor.memoryOverhead needs to be set differently from situation to situation. but shouldnt the default be a somewhat higher value so that these errors are unlikely, and then the experts that are willing to deal with these errors can tune it lower? so why not make the default 10% instead of 7%? that gives something that works in most situations out of the box (at the cost of being a little wasteful). it worked for me. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is SparkSQL optimizer aware of the needed data after the query?
-dev +user No, lambda functions and other code are black-boxes to Spark SQL. If you want those kinds of optimizations you need to express the columns required in either SQL or the DataFrame DSL (coming in 1.3). On Mon, Mar 2, 2015 at 1:55 AM, Wail w.alkowail...@cces-kacst-mit.org wrote: Dears, I'm just curious about the complexity of the query optimizer. Can the optimizer evaluates what after the SQL? maybe it's a stupid question ,, but here is an example to show the case: From the Spark SQL example: val teenagers = sqlContext.sql(SELECT * FROM people WHERE age = 13 AND age = 19) if(condition) { teenagers.map(t = Name: + t(0)).collect().foreach(println) } else { teenagers.map(t = Age: + t(1)).collect().foreach(println) } As for instance ... is the optimizer aware that I need only one column and pushes down the projection to bring only one as needed? Thanks! -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Is-SparkSQL-optimizer-aware-of-the-needed-data-after-the-query-tp10835.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com. - To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org
Re: SparkSQL production readiness
We have been using Spark SQL in production for our customers at Databricks for almost a year now. We also know of some very large production deployments elsewhere. It is still a young project, but I wouldn't call it alpha. The primary changes to the API are the addition of the DataFrame interface, which is an expansion of the DSL that was already there. All of the SQL / HiveQL stuff remains unchanged, as well as the internal execution engine. DataFrames are still marked experimental, since as you said, we should let people use them before cementing them. On Mon, Mar 2, 2015 at 7:29 AM, Daniel Siegmann daniel.siegm...@teamaol.com wrote: I thought removing the alpha tag just meant the API was stable? Speaking of which, aren't there major changes to the API coming in 1.3? Why are you marking the API as stable before these changes have been widely used? On Sat, Feb 28, 2015 at 5:17 PM, Michael Armbrust mich...@databricks.com wrote: We are planning to remove the alpha tag in 1.3.0. On Sat, Feb 28, 2015 at 12:30 AM, Wang, Daoyuan daoyuan.w...@intel.com wrote: Hopefully the alpha tag will be remove in 1.4.0, if the community can review code a little bit faster :P Thanks, Daoyuan *From:* Ashish Mukherjee [mailto:ashish.mukher...@gmail.com] *Sent:* Saturday, February 28, 2015 4:28 PM *To:* user@spark.apache.org *Subject:* SparkSQL production readiness Hi, I am exploring SparkSQL for my purposes of performing large relational operations across a cluster. However, it seems to be in alpha right now. Is there any indication when it would be considered production-level? I don't see any info on the site. Regards, Ashish
Re: Is SPARK_CLASSPATH really deprecated?
Just a note for whoever writes the doc, spark.executor.extraClassPath is *prepended* to the executor's classpath, which is a rather important distinction. :-) On Fri, Feb 27, 2015 at 12:21 AM, Patrick Wendell pwend...@gmail.com wrote: I think we need to just update the docs, it is a bit unclear right now. At the time, we made it worded fairly sternly because we really wanted people to use --jars when we deprecated SPARK_CLASSPATH. But there are other types of deployments where there is a legitimate need to augment the classpath of every executor. I think it should probably say something more like Extra classpath entries to append to the classpath of executors. This is sometimes used in deployment environments where dependencies of Spark are present in a specific place on all nodes. Kannan - if you want to submit I patch I can help review it. On Thu, Feb 26, 2015 at 8:24 PM, Kannan Rajah kra...@maprtech.com wrote: Thanks Marcelo. Do you think it would be useful to make spark.executor.extraClassPath be made to pick up some environment variable that can be set from spark-env.sh? Here is a example. spark-env.sh -- executor_extra_cp = get_hbase_jars_for_cp export executor_extra_cp spark-defaults.conf - spark.executor.extraClassPath = ${executor_extra_cp} This will let us add logic inside get_hbase_jars_for_cp function to pick the right version hbase jars. There could be multiple versions installed on the node. -- Kannan On Thu, Feb 26, 2015 at 6:08 PM, Marcelo Vanzin van...@cloudera.com wrote: On Thu, Feb 26, 2015 at 5:12 PM, Kannan Rajah kra...@maprtech.com wrote: Also, I would like to know if there is a localization overhead when we use spark.executor.extraClassPath. Again, in the case of hbase, these jars would be typically available on all nodes. So there is no need to localize them from the node where job was submitted. I am wondering if we use the SPARK_CLASSPATH approach, then it would not do localization. That would be an added benefit. Please clarify. spark.executor.extraClassPath doesn't localize anything. It just prepends those classpath entries to the usual classpath used to launch the executor. There's no copying of files or anything, so they're expected to exist on the nodes. It's basically exactly the same as SPARK_CLASSPATH, but broken down to two options (one for the executors, and one for the driver). -- Marcelo -- Marcelo - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Issues reading in Json file with spark sql
Hi All, I am currently having issues reading in a json file using spark sql's api. Here is what the json file looks like: { namespace: spacey, name: namer, type: record, fields: [ {name:f1,type:[null,string]}, {name:f2,type:[null,string]}, {name:f3,type:[null,string]}, {name:f4,type:[null,string]}, {name:f5,type:[null,string]}, {name:f6,type:[null,string]}, {name:f7,type:[null,string]}, {name:f8,type:[null,string]}, {name:f9,type:[null,string]}, {name:f10,type:[null,string]}, {name:f11,type:[null,string]}, {name:f12,type:[null,string]}, {name:f13,type:[null,string]}, {name:f14,type:[null,string]}, {name:f15,type:[null,string]} ] } This is what I am doing to read in the json file(using spark sql in the spark shell on CDH5.3): val sqlsc = new org.apache.spark.sql.SQLContext(sc) val j = sqlsc.jsonFile(/tmp/try.avsc) This is what I am getting as an error: 15/03/02 11:23:45 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 12, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.1 in stage 3.0 (TID 14, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 13) in 128 ms on 10.0.2.15 (1/2) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.1 in stage 3.0 (TID 14) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 1] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.2 in stage 3.0 (TID 15, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.2 in stage 3.0 (TID 15) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 2] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.3 in stage 3.0 (TID 16, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.3 in stage 3.0 (TID 16) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 3] 15/03/02 11:23:45 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job 15/03/02 11:23:45 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/03/02 11:23:45 INFO TaskSchedulerImpl: Cancelling stage 3 15/03/02 11:23:45 INFO DAGScheduler: Job 3 failed: reduce at JsonRDD.scala:57, took 0.210707 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 16, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at
Re: Issues reading in Json file with spark sql
Is the string of the above JSON object in the same line? jsonFile requires that every line is a JSON object or an array of JSON objects. On Mon, Mar 2, 2015 at 11:28 AM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having issues reading in a json file using spark sql's api. Here is what the json file looks like: { namespace: spacey, name: namer, type: record, fields: [ {name:f1,type:[null,string]}, {name:f2,type:[null,string]}, {name:f3,type:[null,string]}, {name:f4,type:[null,string]}, {name:f5,type:[null,string]}, {name:f6,type:[null,string]}, {name:f7,type:[null,string]}, {name:f8,type:[null,string]}, {name:f9,type:[null,string]}, {name:f10,type:[null,string]}, {name:f11,type:[null,string]}, {name:f12,type:[null,string]}, {name:f13,type:[null,string]}, {name:f14,type:[null,string]}, {name:f15,type:[null,string]} ] } This is what I am doing to read in the json file(using spark sql in the spark shell on CDH5.3): val sqlsc = new org.apache.spark.sql.SQLContext(sc) val j = sqlsc.jsonFile(/tmp/try.avsc) This is what I am getting as an error: 15/03/02 11:23:45 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 12, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.1 in stage 3.0 (TID 14, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 13) in 128 ms on 10.0.2.15 (1/2) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.1 in stage 3.0 (TID 14) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 1] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.2 in stage 3.0 (TID 15, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.2 in stage 3.0 (TID 15) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 2] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.3 in stage 3.0 (TID 16, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.3 in stage 3.0 (TID 16) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 3] 15/03/02 11:23:45 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job 15/03/02 11:23:45 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/03/02 11:23:45 INFO TaskSchedulerImpl: Cancelling stage 3 15/03/02 11:23:45 INFO DAGScheduler: Job 3 failed: reduce at JsonRDD.scala:57, took 0.210707 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 16, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at
Re: What joda-time dependency does spark submit use/need?
Hi Todd, So I am already specifying joda-time-2.7 (have tried 2.2, 2.3, 2.6, 2.7) in the --jars option. I tried using the joda-time bundle jar ( http://mvnrepository.com/artifact/org.apache.servicemix.bundles/org.apache.servicemix.bundles.joda-time/2.3_1) which comes with joda-convert. I know its pointing to the appropriate jar as I included this in my java program: System.out.println(DateTimeFormat.forPattern(MMdd) .getClass() .getProtectionDomain() .getCodeSource() .getLocation()); and it tells me that it's pointing to the jar that i'm giving it in the --jars option. It would be helpful to know if the error I am getting is because of spark-submit or the java app. Thank you! Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at java.lang.Class.newInstance(Class.java:374) at com.amazonaws.auth.SignerFactory.createSigner(SignerFactory.java:119) at com.amazonaws.auth.SignerFactory.lookupAndCreateSigner(SignerFactory.java:105) at com.amazonaws.auth.SignerFactory.getSigner(SignerFactory.java:78) at com.amazonaws.AmazonWebServiceClient.computeSignerByServiceRegion(AmazonWebServiceClient.java:307) at com.amazonaws.AmazonWebServiceClient.computeSignerByURI(AmazonWebServiceClient.java:280) at com.amazonaws.AmazonWebServiceClient.setEndpoint(AmazonWebServiceClient.java:160) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.setEndpoint(AmazonDynamoDBClient.java:2946) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:351) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:273) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.init(AmazonDynamoDBClient.java:250) at AmazonDynamoDBSample.init(AmazonDynamoDBSample.java:81) at AmazonDynamoDBSample.main(AmazonDynamoDBSample.java:87) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassNotFoundException: org.joda.time.format.DateTimeFormat at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) Best, Su On Fri, Feb 27, 2015 at 4:14 PM, Todd Nist tsind...@gmail.com wrote: You can specify these jars (joda-time-2.7.jar, joda-convert-1.7.jar) either as part of your build and assembly or via the --jars option to spark-submit. HTH. On Fri, Feb 27, 2015 at 2:48 PM, Su She suhsheka...@gmail.com wrote: Hello Everyone, I'm having some issues launching (non-spark) applications via the spark-submit commands. The common error I am getting is c/p below. I am able to submit a spark streaming/kafka spark application, but can't start a dynamoDB java app. The common error is related to joda-time. 1) I realized spark-submit was pointing to joda-time-1.6 in the hadoop/lib,so I deleted this and my error changed from NoSuchMethodFound to NoClassDefFoundError. Instead of pointing to the other version of joda-time in the hadoop/lib, it now pointed to the jars I set a path to in my spark-submit command (I tried joda-time versions 2.2, 2.3, 2.6, 2.7), but still got the errors 2) My rudimentary theory is that spark-submit uses joda-time-2.0, but the applications I'm running need 2.0. Thank you for the help! Exception in thread main java.lang.NoClassDefFoundError: org/joda/time/format/DateTimeFormat at com.amazonaws.auth.AWS4Signer.clinit(AWS4Signer.java:44) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
External Data Source in Spark
Hi Colleagues, Currently we have implemented External Data Source API and are able to push filters and projections. Could you provide some info on how perhaps the joins could be pushed to the original Data Source if both the data sources are from same database Briefly looked at DataSourceStrategy.scala but could not get far Best Regards Santosh
Re: Issues reading in Json file with spark sql
According to Spark SQL Programming Guide: jsonFile - loads data from a directory of JSON files where each line of the files is a JSON object. Note that the file that is offered as jsonFile is not a typical JSON file. Each line must contain a separate, self-contained valid JSON object. As a consequence, a regular multi-line JSON file will most often fail. -- Emre Sevinç http://www.bigindustries.be On Mar 2, 2015 8:29 PM, kpeng1 kpe...@gmail.com wrote: Hi All, I am currently having issues reading in a json file using spark sql's api. Here is what the json file looks like: { namespace: spacey, name: namer, type: record, fields: [ {name:f1,type:[null,string]}, {name:f2,type:[null,string]}, {name:f3,type:[null,string]}, {name:f4,type:[null,string]}, {name:f5,type:[null,string]}, {name:f6,type:[null,string]}, {name:f7,type:[null,string]}, {name:f8,type:[null,string]}, {name:f9,type:[null,string]}, {name:f10,type:[null,string]}, {name:f11,type:[null,string]}, {name:f12,type:[null,string]}, {name:f13,type:[null,string]}, {name:f14,type:[null,string]}, {name:f15,type:[null,string]} ] } This is what I am doing to read in the json file(using spark sql in the spark shell on CDH5.3): val sqlsc = new org.apache.spark.sql.SQLContext(sc) val j = sqlsc.jsonFile(/tmp/try.avsc) This is what I am getting as an error: 15/03/02 11:23:45 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 12, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.TraversableOnce$class.reduceLeft(TraversableOnce.scala:172) at scala.collection.AbstractIterator.reduceLeft(Iterator.scala:1157) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:853) at org.apache.spark.rdd.RDD$$anonfun$18.apply(RDD.scala:851) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.SparkContext$$anonfun$29.apply(SparkContext.scala:1350) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) at org.apache.spark.scheduler.Task.run(Task.scala:56) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.1 in stage 3.0 (TID 14, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 13) in 128 ms on 10.0.2.15 (1/2) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.1 in stage 3.0 (TID 14) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 1] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.2 in stage 3.0 (TID 15, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.2 in stage 3.0 (TID 15) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 2] 15/03/02 11:23:45 INFO TaskSetManager: Starting task 0.3 in stage 3.0 (TID 16, 10.0.2.15, ANY, 1308 bytes) 15/03/02 11:23:45 INFO TaskSetManager: Lost task 0.3 in stage 3.0 (TID 16) on executor 10.0.2.15: scala.MatchError (namespace (of class java.lang.String)) [duplicate 3] 15/03/02 11:23:45 ERROR TaskSetManager: Task 0 in stage 3.0 failed 4 times; aborting job 15/03/02 11:23:45 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/03/02 11:23:45 INFO TaskSchedulerImpl: Cancelling stage 3 15/03/02 11:23:45 INFO DAGScheduler: Job 3 failed: reduce at JsonRDD.scala:57, took 0.210707 s org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 16, 10.0.2.15): scala.MatchError: namespace (of class java.lang.String) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:305) at org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$2.apply(JsonRDD.scala:303) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at