Re: Parquet problems
No, never really resolved the problem, except by increasing the permgem space which only partially solved it. Still have to restart the job multiple times so make the whole job complete (it stores intermediate results). The parquet data sources have about 70 columns, and yes Cheng, it works fine when only loading a small sample of the data. Thankful for any hints, Anders On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote: How many columns are there in these Parquet files? Could you load a small portion of the original large dataset successfully? Cheng On 6/25/15 5:52 PM, Anders Arpteg wrote: Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread sk-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
spark.deploy.spreadOut core allocation
Hello, I've set spark.deploy.spreadOut=false in spark-env.sh. export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 -Dspark.deploy.spreadOut=false There are 3 workers each with 4 cores. Spark-shell was started with noof cores = 6. Spark UI show that one executor was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth
Re: user threads in executors
Thanks ! I am using spark streaming 1.3 , And if some post fails because of any reason, I will store the offset of that message in another kafka topic. I want to read these offsets in another spark job and from them the original kafka topic's messages based on these offsets- So is it possible in spark job to get kafka messages based on random offsets ? Or is there any better alternative to handle failure of post request? On Wed, Jul 22, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Yes, you could unroll from the iterator in batch of 100-200 and then post them in multiple rounds. If you are using the Kafka receiver based approach (not Direct), then the raw Kafka data is stored in the executor memory. If you are using Direct Kafka, then it is read from Kafka directly at the time of filtering. TD On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora shushantaror...@gmail.com wrote: I can post multiple items at a time. Data is being read from kafka and filtered after that its posted . Does foreachPartition load complete partition in memory or use an iterator of batch underhood? If compete batch is not loaded will using custim size of 100-200 request in one batch and post will help instead of whole partition ? On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com wrote: If you can post multiple items at a time, then use foreachPartition to post the whole partition in a single request. On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher rmarsc...@localytics.com wrote: You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
problems running Spark on a firewalled remote YARN cluster via SOCKS proxy
I am trying to run Spark applications with the driver running locally and interacting with a firewalled remote cluster via a SOCKS proxy. I have to modify the hadoop configuration on the *local machine* to try to make this work, adding property namehadoop.rpc.socket.factory.class.default/name valueorg.apache.hadoop.net.SocksSocketFactory/value /property property namehadoop.socks.server/name valuelocalhost:9998/value /property and on the *remote cluster* side property namehadoop.rpc.socket.factory.class.default/name valueorg.apache.hadoop.net.StandardSocketFactory/value finaltrue/final /property With this setup, and running ssh -D 9998 gateway.host to start the proxy connection, MapReduce jobs started on the local machine execute fine on the remote cluster. However, trying to launch a Spark job fails with the nodes of the cluster apparently unable to communicate with one another: java.io.IOException: Failed on local exception: java.net.SocketException: Connection refused; Host Details : local host is: node3/10.211.55.103; destination host is: node1:8030; Looking at the packets being sent to node1 from node3, it's clear that no requests are made on port 8030, hinting that the connection is somehow being proxied. Is it possible that the Spark job is not honoring the socket.factory settings on the *cluster* side for some reason? Note that Spark JIRA 5004 https://issues.apache.org/jira/browse/SPARK-5004 addresses a similar problem, though it looks like they are actually not the same (since in that case it sounds like a standalone cluster is being used). -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/problems-running-Spark-on-a-firewalled-remote-YARN-cluster-via-SOCKS-proxy-tp23955.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: user threads in executors
Yes, look at KafkaUtils.createRDD On Wed, Jul 22, 2015 at 11:17 AM, Shushant Arora shushantaror...@gmail.com wrote: Thanks ! I am using spark streaming 1.3 , And if some post fails because of any reason, I will store the offset of that message in another kafka topic. I want to read these offsets in another spark job and from them the original kafka topic's messages based on these offsets- So is it possible in spark job to get kafka messages based on random offsets ? Or is there any better alternative to handle failure of post request? On Wed, Jul 22, 2015 at 11:31 AM, Tathagata Das t...@databricks.com wrote: Yes, you could unroll from the iterator in batch of 100-200 and then post them in multiple rounds. If you are using the Kafka receiver based approach (not Direct), then the raw Kafka data is stored in the executor memory. If you are using Direct Kafka, then it is read from Kafka directly at the time of filtering. TD On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora shushantaror...@gmail.com wrote: I can post multiple items at a time. Data is being read from kafka and filtered after that its posted . Does foreachPartition load complete partition in memory or use an iterator of batch underhood? If compete batch is not loaded will using custim size of 100-200 request in one batch and post will help instead of whole partition ? On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com wrote: If you can post multiple items at a time, then use foreachPartition to post the whole partition in a single request. On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher rmarsc...@localytics.com wrote: You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: Parquet problems
How many columns are there in these Parquet files? Could you load a small portion of the original large dataset successfully? Cheng On 6/25/15 5:52 PM, Anders Arpteg wrote: Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.com mailto:sabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com mailto:arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread sk-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Re: Spark-hive parquet schema evolution
Yeah, the benefit of `saveAsTable` is that you don't need to deal with schema explicitly, while the benefit of ALTER TABLE is you still have a standard vanilla Hive table. Cheng On 7/22/15 11:00 PM, Dean Wampler wrote: While it's not recommended to overwrite files Hive thinks it understands, you can add the column to Hive's metastore using an ALTER TABLE command using HiveQL in the Hive shell or using HiveContext.sql(): ALTER TABLE mytable ADD COLUMNS col_name data_type See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column for full details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Since Hive doesn’t support schema evolution, you’ll have to update the schema stored in metastore somehow. For example, you can create a new external table with the merged schema. Say you have a Hive table |t1|: |CREATE TABLE t1 (c0 INT, c1 DOUBLE); | By default, this table is stored in HDFS path |hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append some Parquet data with an extra column |c2| to the same directory: |import org.apache.spark.sql.types._ val path = hdfs://some-host:9000/user/hive/warehouse/t1 val df1 = sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast StringType as 'c2) df1.write.mode(append).parquet(path) | Now you can create a new external table |t2| like this: |val df2 = sqlContext.read.option( mergeSchema, true).parquet(path) df2.write.path(path).saveAsTable(t2) | Since we specified a path above, the newly created |t2| is an external table pointing to the original HDFS location. But the schema of |t2| is the merged version. The drawback of this approach is that, |t2| is actually a Spark SQL specific data source table rather than a genuine Hive table. This means, it can be accessed by Spark SQL only. We’re just using Hive metastore to help persisting metadata of the data source table. However, since you’re asking how to access the new table via Spark SQL CLI, this should work for you. We are working on making Parquet and ORC data source tables accessible via Hive in Spark 1.5.0. Cheng On 7/22/15 10:32 AM, Jerrick Hoang wrote: Hi Lian, Sorry I'm new to Spark so I did not express myself very clearly. I'm concerned about the situation when let's say I have a Parquet table some partitions and I add a new column A to parquet schema and write some data with the new schema to a new partition in the table. If i'm not mistaken, if I do a sqlContext.read.parquet(table_path).printSchema() it will print the correct schema with new column A. But if I do a 'describe table' from SparkSQLCLI I won't see the new column being added. I understand that this is because Hive doesn't support schema evolution. So what is the best way to support CLI queries in this situation? Do I need to manually alter the table everytime the underlying schema changes? Thanks On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hey Jerrick, What do you mean by schema evolution with Hive metastore tables? Hive doesn't take schema evolution into account. Could you please give a concrete use case? Are you trying to write Parquet data with extra columns into an existing metastore Parquet table? Cheng On 7/21/15 1:04 AM, Jerrick Hoang wrote: I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Performance issue with Spak's foreachpartition method
Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
Help accessing protected S3
I have a protected s3 bucket that requires a certain IAM role to access. When I start my cluster using the spark-ec2 script, everything works just fine until I try to read from that part of s3. Here is the command I am using: ./spark-ec2 -k KEY -i KEY_FILE.pem --additional-security-group=IAM_ROLE --copy-aws-credentials --zone=us-east-1e -t m1.large --worker-instances=3 --hadoop-major-version=2.7.1 --user-data=test.sh launch my-cluster I have read through this article: http://apache-spark-user-list.1001560.n3.nabble.com/S3-Bucket-Access-td16303.html The problem seems to be very similar, but I wasn't able to find a solution in it for me. I'm not sure what else to provide here, just let me know what you need. Thanks in advance! - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Parquet problems
Hi guys, I noticed that too. Anders, can you confirm that it works on Spark 1.5 snapshot? This is what I tried at the end. It seems it is 1.4 issue. Best Regards, Jerry On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg arp...@spotify.com wrote: No, never really resolved the problem, except by increasing the permgem space which only partially solved it. Still have to restart the job multiple times so make the whole job complete (it stores intermediate results). The parquet data sources have about 70 columns, and yes Cheng, it works fine when only loading a small sample of the data. Thankful for any hints, Anders On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote: How many columns are there in these Parquet files? Could you load a small portion of the original large dataset successfully? Cheng On 6/25/15 5:52 PM, Anders Arpteg wrote: Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread sk-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Burak, Looking at the source code, the intermediate RDDs used in ALS.train() are persisted during the computation using intermediateRDDStorageLevel (default value is StorageLevel.MEMORY_AND_DISK) - see herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L546, herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L548, and herehttps://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L556. At the end of the ALS calculation, these RDDs are no longer needed nor returned, so I would assume the logical choice would be to unpersist() these RDDs. The strategy in the code seems to be set by finalRDDStorageLevel, which for some reason only calls unpersist() on the intermediate RDDs if finalRDDStorageLevel != StorageLevel.NONE, which seems counter-intuitive to me. Jonathan From: Burak Yavuz brk...@gmail.commailto:brk...@gmail.com Date: Wednesday, July 22, 2015 at 10:47 AM To: Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Cc: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hi Jonathan, I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it. Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train? Thanks, Burak On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com wrote: Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed.
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hi Jonathan, I believe calling persist with StorageLevel.NONE doesn't do anything. That's why the unpersist has an if statement before it. Could you give more information about your setup please? Number of cores, memory, number of partitions of ratings_train? Thanks, Burak On Wed, Jul 22, 2015 at 10:38 AM, Stahlman, Jonathan jonathan.stahl...@capitalone.com wrote: Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.org user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) -- The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
How to keep RDDs in memory between two different batch jobs?
Hi, We have a requirement wherein we need to keep RDDs in memory between Spark batch processing that happens every one hour. The idea here is to have RDDs that have active user sessions in memory between two jobs so that once a job processing is done and another job is run after an hour the RDDs with active sessions are still available for joining with those in the current job. So, what do we need to keep the data in memory in between two batch jobs? Can we use Tachyon? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
To be Unpersisted the RDD must be persisted first. If it's set to None, then it's not persisted, and as such does not need to be freed. Does that make sense ? Thank you, Ilya Ganelin -Original Message- From: Stahlman, Jonathan [jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com] Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time To: user@spark.apache.org Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer. The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: Spark spark.shuffle.memoryFraction has no affect
Hi Andrew I tried many different combinations, but still no change in the amount of shuffle bytes spilled to disk by checking the UI. I made sure the configuration have been applied by checking Spark UI/Environment. I only see changes in shuffle bytes spilled if I disable spark.shuffle.spill On Jul 22, 2015, at 3:15 AM, Andrew Or and...@databricks.com wrote: Hi, The setting of 0.2 / 0.6 looks reasonable to me. Since you are not using caching at all, have you tried trying something more extreme, like 0.1 / 0.9? Since disabling spark.shuffle.spill didn't cause an OOM this setting should be fine. Also, one thing you could do is to verify the shuffle bytes spilled on the UI before and after the change. Let me know if that helped. -Andrew 2015-07-21 13:50 GMT-07:00 wdbaruni wdbar...@gmail.com mailto:wdbar...@gmail.com: Hi I am testing Spark on Amazon EMR using Python and the basic wordcount example shipped with Spark. After running the application, I realized that in Stage 0 reduceByKey(add), around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to disk. Since in the wordcount example I am not caching or persisting any data, so I thought I can increase the performance of this application by giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the following: spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.6 However, I am still getting the same performance and the same amount of shuffle data is being spilled to disk and memory. I validated that Spark is reading these configurations using Spark UI/Environment and I can see my changes. Moreover, I tried setting spark.shuffle.spill to false and I got the performance I am looking for and all shuffle data was spilled to memory only. So, what am I getting wrong here and why not the extra shuffle memory fraction is not utilized? *My environment:* Amazon EMR with Spark 1.3.1 running using -x argument 1 Master node: m3.xlarge 3 Core nodes: m3.xlarge Application: wordcount.py Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3 *Submit command:* /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py s3n://input location *spark-defaults.conf:* spark.eventLog.enabled false spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 spark.driver.extraJavaOptions -Dspark.driver.log.level=INFO spark.masteryarn spark.executor.instances3 spark.executor.cores4 spark.executor.memory 9404M spark.default.parallelism 12 spark.eventLog.enabled true spark.eventLog.dir hdfs:///spark-logs/ spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.6 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org mailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org mailto:user-h...@spark.apache.org
Re: Parquet problems
For what it's worth, my data set has around 85 columns in Parquet format as well. I have tried bumping the permgen up to 512m but I'm still getting errors in the driver thread. On Wed, Jul 22, 2015 at 1:20 PM, Jerry Lam chiling...@gmail.com wrote: Hi guys, I noticed that too. Anders, can you confirm that it works on Spark 1.5 snapshot? This is what I tried at the end. It seems it is 1.4 issue. Best Regards, Jerry On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg arp...@spotify.com wrote: No, never really resolved the problem, except by increasing the permgem space which only partially solved it. Still have to restart the job multiple times so make the whole job complete (it stores intermediate results). The parquet data sources have about 70 columns, and yes Cheng, it works fine when only loading a small sample of the data. Thankful for any hints, Anders On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote: How many columns are there in these Parquet files? Could you load a small portion of the original large dataset successfully? Cheng On 6/25/15 5:52 PM, Anders Arpteg wrote: Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread sk-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Re: How to share a Map among RDDS?
Hi, Andrew, If I broadcast the Map: val map2=sc.broadcast(map1) I will get compilation error: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]] does not take parameters [error] val matchs= Vecs.map(term=term.map{case (a,b)=(map2(a),b)}) Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks! Cheers, Dan 2015-07-22 2:20 GMT-05:00 Andrew Or and...@databricks.com: Hi Dan, If the map is small enough, you can just broadcast it, can't you? It doesn't have to be an RDD. Here's an example of broadcasting an array and using it on the executors: https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala . -Andrew 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com: Either you have to do rdd.collect and then broadcast or you can do a join On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote: Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map(aa-1,bb-2,cc-3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Re: How to share a Map among RDDS?
Thanks Andrew, exactly. 2015-07-22 14:26 GMT-05:00 Andrew Or and...@databricks.com: Hi Dan, `map2` is a broadcast variable, not your map. To access the map on the executors you need to do `map2.value(a)`. -Andrew 2015-07-22 12:20 GMT-07:00 Dan Dong dongda...@gmail.com: Hi, Andrew, If I broadcast the Map: val map2=sc.broadcast(map1) I will get compilation error: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]] does not take parameters [error] val matchs= Vecs.map(term=term.map{case (a,b)=(map2(a),b)}) Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks! Cheers, Dan 2015-07-22 2:20 GMT-05:00 Andrew Or and...@databricks.com: Hi Dan, If the map is small enough, you can just broadcast it, can't you? It doesn't have to be an RDD. Here's an example of broadcasting an array and using it on the executors: https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala . -Andrew 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com: Either you have to do rdd.collect and then broadcast or you can do a join On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote: Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map(aa-1,bb-2,cc-3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Re: How to share a Map among RDDS?
Hi Dan, `map2` is a broadcast variable, not your map. To access the map on the executors you need to do `map2.value(a)`. -Andrew 2015-07-22 12:20 GMT-07:00 Dan Dong dongda...@gmail.com: Hi, Andrew, If I broadcast the Map: val map2=sc.broadcast(map1) I will get compilation error: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,String]] does not take parameters [error] val matchs= Vecs.map(term=term.map{case (a,b)=(map2(a),b)}) Seems it's still an RDD, so how to access it by value=map2(key) ? Thanks! Cheers, Dan 2015-07-22 2:20 GMT-05:00 Andrew Or and...@databricks.com: Hi Dan, If the map is small enough, you can just broadcast it, can't you? It doesn't have to be an RDD. Here's an example of broadcasting an array and using it on the executors: https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala . -Andrew 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com: Either you have to do rdd.collect and then broadcast or you can do a join On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote: Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map(aa-1,bb-2,cc-3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Re: spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?
That makes a lot of sense, thanks for the concise answer! On Wed, Jul 22, 2015 at 4:10 PM, Andrew Or and...@databricks.com wrote: Hi Michael, In general, driver related properties should not be set through the SparkConf. This is because by the time the SparkConf is created, we have already started the driver JVM, so it's too late to change the memory, class paths and other properties. In cluster mode, executor related properties should also not be set through the SparkConf. This is because the driver is run on the cluster just like the executors, and the executors are launched independently by whatever the cluster manager (e.g. YARN) is configured to do. The recommended way of setting these properties is either through the conf/spark-defaults.conf properties file, or through the spark-submit command line, e.g.: bin/spark-shell --master yarn --executor-memory 2g --driver-memory 5g Let me know if that answers your question, -Andrew 2015-07-22 12:38 GMT-07:00 Michael Misiewicz mmisiew...@gmail.com: Hi group, I seem to have encountered a weird problem with 'spark-submit' and manually setting sparkconf values in my applications. It seems like setting the configuration values spark.executor.memory and spark.driver.memory don't have any effect, when they are set from within my application (i.e. prior to creating a SparkContext). In yarn-cluster mode, only the values specified on the command line via spark-submit for driver and executor memory are respected, and if not, it appears spark falls back to defaults. For example, Correct behavior noted in Driver's logs on YARN when --executor-memory is specified: 15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor containers, each with 1 cores and 13824 MB memory including 1536 MB overhead 15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:13824, vCores:1) But not when spark.executor.memory is specified prior to spark context initialization: 15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor containers, each with 1 cores and 2560 MB memory including 1536 MB overhead 15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:2560, vCores:1) In both cases, executor mem should be 10g. Interestingly, I set a parameter spark.yarn.executor.memoryOverhead which appears to be respected whether or not I'm in yarn-cluster or yarn-client mode. Has anyone seen this before? Any idea what might be causing this behavior?
Re: spark.deploy.spreadOut core allocation
Hi Srikanth, It does look like a bug. Did you set `spark.executor.cores` in your application by any chance? -Andrew 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com: Hello, I've set spark.deploy.spreadOut=false in spark-env.sh. export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 -Dspark.deploy.spreadOut=false There are 3 workers each with 4 cores. Spark-shell was started with noof cores = 6. Spark UI show that one executor was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth
Re: How to keep RDDs in memory between two different batch jobs?
Tachyon is one way. Also check out the Spark Job Server https://github.com/spark-jobserver/spark-jobserver . -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23958.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to keep RDDs in memory between two different batch jobs?
I was about say whatever the previous post said,so +1 to the previous post,from my understanding (gut feeling) of your requirement it very easy to do this with spark-job-server. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23960.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark DataFrame created from JavaRDDRow copies all columns data into first column
Hi I have a DataFrame which I need to convert into JavaRDD and back to DataFrame I have the following code DataFrame sourceFrame = hiveContext.read().format(orc).load(/path/to/orc/file); //I do order by in above sourceFrame and then I convert it into JavaRDD JavaRDDRow modifiedRDD = sourceFrame.toJavaRDD().map(new FunctionRow,Row({ public Row call(Row row) throws Exception { if(row != null) { //updated row by creating new Row return RowFactory.create(updateRow); } return null; }); //now I convert above JavaRDDRow into DataFrame using the following DataFrame modifiedFrame = sqlContext.createDataFrame(modifiedRDD,schema); sourceFrame and modifiedFrame schema is same when I call sourceFrame.show() output is expected I see every column has corresponding values and no column is empty but when I call modifiedFrame.show() I see all the columns values gets merged into first column value for e.g. assume source DataFrame has 3 column as shown below _col1_col2_col3 ABC 10 DEF GHI 20 JKL When I print modifiedFrame which I converted from JavaRDD it shows in the following order _col1 _col2 _col3 ABC,10,DEF GHI,20,JKL As shown above all the _col1 has all the values and _col2 and _col3 is empty. I dont know what is wrong I am doing please guide I am new to Spark thanks in advance. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-DataFrame-created-from-JavaRDD-Row-copies-all-columns-data-into-first-column-tp23961.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Performance issue with Spak's foreachpartition method
The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem. Robin On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info http://logger.info/(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
spark.executor.memory and spark.driver.memory have no effect in yarn-cluster mode (1.4.x)?
Hi group, I seem to have encountered a weird problem with 'spark-submit' and manually setting sparkconf values in my applications. It seems like setting the configuration values spark.executor.memory and spark.driver.memory don't have any effect, when they are set from within my application (i.e. prior to creating a SparkContext). In yarn-cluster mode, only the values specified on the command line via spark-submit for driver and executor memory are respected, and if not, it appears spark falls back to defaults. For example, Correct behavior noted in Driver's logs on YARN when --executor-memory is specified: 15/07/22 19:25:59 INFO yarn.YarnAllocator: Will request 200 executor containers, each with 1 cores and 13824 MB memory including 1536 MB overhead 15/07/22 19:25:59 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:13824, vCores:1) But not when spark.executor.memory is specified prior to spark context initialization: 15/07/22 19:22:22 INFO yarn.YarnAllocator: Will request 200 executor containers, each with 1 cores and 2560 MB memory including 1536 MB overhead 15/07/22 19:22:22 INFO yarn.YarnAllocator: Container request (host: Any, capability: memory:2560, vCores:1) In both cases, executor mem should be 10g. Interestingly, I set a parameter spark.yarn.executor.memoryOverhead which appears to be respected whether or not I'm in yarn-cluster or yarn-client mode. Has anyone seen this before? Any idea what might be causing this behavior?
databricks spark sql csv FAILFAST not failing, Spark 1.3.1 Java 7
Hi all, I am using the databricks csv library to load some data into a data frame. https://github.com/databricks/spark-csv I am trying to confirm that failfast mode works correctly and aborts execution upon receiving an invalid csv file. But have not been able to see it fail yet after testing numerous invalid csv files. Any advice? spark 1.3.1 running on mapr vm 4.1.0 java 1.7 SparkConf conf = new SparkConf().setAppName(Dataframe testing); JavaSparkContext sc = new JavaSparkContext(conf); SQLContext sqlContext = new SQLContext(sc); HashMapString, String options = new HashMapString, String(); options.put(header, true); options.put(path, args[0]); options.put(mode, FAILFAST); //partner data DataFrame partnerData = sqlContext.load(com.databricks.spark.csv, options ); //register partnerData table in spark sql partnerData.registerTempTable(partnerData); partnerData.printSchema(); partnerData.show(); It just runs like normal, and will output the data, even with an invalid csv file. Thanks!
Re: spark.deploy.spreadOut core allocation
Cool. Thanks! Srikanth On Wed, Jul 22, 2015 at 3:12 PM, Andrew Or and...@databricks.com wrote: Hi Srikanth, I was able to reproduce the issue by setting `spark.cores.max` to a number greater than the number of cores on a worker. I've filed SPARK-9260 which I believe is already being fixed in https://github.com/apache/spark/pull/7274. Thanks for reporting the issue! -Andrew 2015-07-22 11:49 GMT-07:00 Andrew Or and...@databricks.com: Hi Srikanth, It does look like a bug. Did you set `spark.executor.cores` in your application by any chance? -Andrew 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com: Hello, I've set spark.deploy.spreadOut=false in spark-env.sh. export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 -Dspark.deploy.spreadOut=false There are 3 workers each with 4 cores. Spark-shell was started with noof cores = 6. Spark UI show that one executor was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth
Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel
Hello again, In trying to understand the caching of intermediate RDDs by ALS, I looked into the source code and found what may be a bug. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230 you see that ALS.train() is being called with finalRDDStorageLevel = StorageLevel.NONE, which I would understand to mean that the intermediate RDDs will not be persisted. Looking here: https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631 unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE. This doesn’t make sense to me – I would expect the RDDs to be removed from the cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around. Jonathan From: Stahlman, Stahlman Jonathan jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com Date: Thursday, July 16, 2015 at 2:18 PM To: user@spark.apache.orgmailto:user@spark.apache.org user@spark.apache.orgmailto:user@spark.apache.org Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel Hello all, I am running the Spark recommendation algorithm in MLlib and I have been studying its output with various model configurations. Ideally I would like to be able to run one job that trains the recommendation model with many different configurations to try to optimize for performance. A sample code in python is copied below. The issue I have is that each new model which is trained caches a set of RDDs and eventually the executors run out of memory. Is there any way in Pyspark to unpersist() these RDDs after each iteration? The names of the RDDs which I gather from the UI is: itemInBlocks itemOutBlocks Products ratingBlocks userInBlocks userOutBlocks users I am using Spark 1.3. Thank you for any help! Regards, Jonathan data_train, data_cv, data_test = data.randomSplit([99,1,1], 2) functions = [rating] #defined elsewhere ranks = [10,20] iterations = [10,20] lambdas = [0.01,0.1] alphas = [1.0,50.0] results = [] for ratingFunction, rank, numIterations, m_lambda, m_alpha in itertools.product( functions, ranks, iterations, lambdas, alphas ): #train model ratings_train = data_train.map(lambda l: Rating( l.user, l.product, ratingFunction(l) ) ) model = ALS.trainImplicit( ratings_train, rank, numIterations, lambda_=float(m_lambda), alpha=float(m_alpha) ) #test performance on CV data ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, ratingFunction(l) ) ) auc = areaUnderCurve( ratings_cv, model.predictAll ) #save results result = ,.join(str(l) for l in [ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc]) results.append(result) The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.
Re: How to keep RDDs in memory between two different batch jobs?
Actually, I should clarify - Tachyon is a way to keep your data in RAM, but it's not exactly the same as keeping it cached in Spark. Spark Job Server is a way to keep it cached in Spark. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957p23959.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark streaming 1.3 issues
In spark streaming 1.3 - Say I have 10 executors each with 4 cores so in total 40 tasks in parllel at once. If I repartition kafka directstream to 40 partitions vs say I have in kafka topic 300 partitions - which one will be more efficient , Should I repartition the kafka stream equal to num of cores or keep it same as 300? If I have number of partitions greater than parllel tasks will that not cause overhead of task scheduling ? On Wed, Jul 22, 2015 at 11:37 AM, Tathagata Das t...@databricks.com wrote: For Java, do OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*). offsetRanges(); If you fix that error, you should be seeing data. You can call arbitrary RDD operations on a DStream, using DStream.transform. Take a look at the docs. For the direct kafka approach you are doing, - tasks do get launched for empty partitions - driver may make multiple calls to Kafka brokers to get all the offset information. But that does not mean you should reduce partitions. the whole point of having large number of partition is the consume the data in parallel. If you reduce the number of partitions, that defeats the purpose of having partitoins at all. And the driver making calls for getting metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually. Rather receiving and processing the actual data is usually the bottleneck and to increase throughput you should have larger number of partitions. On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I'd suggest you upgrading to 1.4 as it has better metrices and UI. Thanks Best Regards On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com wrote: Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: spark.deploy.spreadOut core allocation
Hi Srikanth, I was able to reproduce the issue by setting `spark.cores.max` to a number greater than the number of cores on a worker. I've filed SPARK-9260 which I believe is already being fixed in https://github.com/apache/spark/pull/7274. Thanks for reporting the issue! -Andrew 2015-07-22 11:49 GMT-07:00 Andrew Or and...@databricks.com: Hi Srikanth, It does look like a bug. Did you set `spark.executor.cores` in your application by any chance? -Andrew 2015-07-22 8:05 GMT-07:00 Srikanth srikanth...@gmail.com: Hello, I've set spark.deploy.spreadOut=false in spark-env.sh. export SPARK_MASTER_OPTS=-Dspark.deploy.defaultCores=4 -Dspark.deploy.spreadOut=false There are 3 workers each with 4 cores. Spark-shell was started with noof cores = 6. Spark UI show that one executor was used with 6 cores. Is this a bug? This is with Spark 1.4. [image: Inline image 1] Srikanth
Re: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:)
Hi, I’m stuck with the same issue, but I see org.apache.hadoop.fs.s3native.NativeS3FileSystem in the hadoop-core:1.0.4 (that’s the current hadoop-client I use) and this far is transitive dependency that comes from spark itself. I’m using custom build of spark 1.3.1 with hadoop-client 1.0.4. [INFO] +- org.apache.spark:spark-core_2.10:jar:1.3.1-hadoop-client-1.0.4:provided ... [INFO] | +- org.apache.hadoop:hadoop-client:jar:1.0.4:provided [INFO] | | \- org.apache.hadoop:hadoop-core:jar:1.0.4:provided The thing is I don’t have any direct usages of any hadoop-client version, so in my understanding I should be able to run my jar on any version of spark (1.3.1 with hadoop-client 2.2.0 up to 2.2.6 or 1.3.1 with hadoop-client 1.0.4 up to 1.2.1), but in reality, running it on a live cluster I’m getting class not found exception. I’ve checked über-jar of spark itself, and NativeS3FileSystem is there, so I don’t really understand where it comes from. java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) I’ve just got an idea. Is it possible that Executor’s classpath is different from the Worker classpath? How can I check Executor’s classpath? On 23 Apr 2015, at 17:35, Ted Yu yuzhih...@gmail.com wrote: NativeS3FileSystem class is in hadoop-aws jar. Looks like it was not on classpath. Cheers On Thu, Apr 23, 2015 at 7:30 AM, Sujee Maniyam su...@sujee.net wrote: Thanks all... btw, s3n load works without any issues with spark-1.3.1-bulit-for-hadoop 2.4 I tried this on 1.3.1-hadoop26 sc.hadoopConfiguration.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem) val f = sc.textFile(s3n://bucket/file) f.count No it can't find the implementation path. Looks like some jar is missing ? java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3native.NativeS3FileSystem not found at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074) at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) On Wednesday, April 22, 2015, Shuai Zheng szheng.c...@gmail.com wrote: Below is my code to access s3n without problem (only for 1.3.1. there is a bug in 1.3.0). Configuration hadoopConf = ctx.hadoopConfiguration(); hadoopConf.set(fs.s3n.impl, org.apache.hadoop.fs.s3native.NativeS3FileSystem); hadoopConf.set(fs.s3n.awsAccessKeyId, awsAccessKeyId); hadoopConf.set(fs.s3n.awsSecretAccessKey, awsSecretAccessKey); Regards, Shuai From: Sujee Maniyam [mailto:su...@sujee.net] Sent: Wednesday, April 22, 2015 12:45 PM To: Spark User List Subject: spark 1.3.1 : unable to access s3n:// urls (no file system for scheme s3n:) Hi all I am unable to access s3n:// urls using sc.textFile().. getting 'no file system for scheme s3n://' error. a bug or some conf settings missing? See below for details: env variables : AWS_SECRET_ACCESS_KEY=set AWS_ACCESS_KEY_ID=set spark/RELAESE : Spark 1.3.1 (git revision 908a0bf) built for Hadoop 2.6.0 Build flags: -Phadoop-2.4 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver -Pyarn -DzincPort=3034 ./bin/spark-shell val f = sc.textFile(s3n://bucket/file) f.count error== java.io.IOException: No FileSystem for scheme: s3n at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2584) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296) at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:256) at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228) at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:313) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:203) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:219) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:217) at scala.Option.getOrElse(Option.scala:120) at
Re: Which memory fraction is Spark using to compute RDDs that are not going to be persisted
Hi, It would be whatever's left in the JVM. This is not explicitly controlled by a fraction like storage or shuffle. However, the computation usually doesn't need to use that much space. In my experience it's almost always the caching or the aggregation during shuffles that's the most memory intensive. -Andrew 2015-07-21 13:47 GMT-07:00 wdbaruni wdbar...@gmail.com: I am new to Spark and I understand that Spark divides the executor memory into the following fractions: *RDD Storage:* Which Spark uses to store persisted RDDs using .persist() or .cache() and can be defined by setting spark.storage.memoryFraction (default 0.6) *Shuffle and aggregation buffers:* Which Spark uses to store shuffle outputs. It can defined using spark.shuffle.memoryFraction. If shuffle output exceeds this fraction, then Spark will spill data to disk (default 0.2) *User code:* Spark uses this fraction to execute arbitrary user code (default 0.2) I am not mentioning the storage and shuffle safety fractions for simplicity. My question is, which memory fraction is Spark using to compute and transform RDDs that are not going to be persisted? For example: lines = sc.textFile(i am a big file.txt) count = lines.flatMap(lambda x: x.split(' ')).map(lambda x: (x, 1)).reduceByKey(add) count.saveAsTextFile(output) Here Spark will not load the whole file at once and will partition the input file and do all these transformations per partition in a single stage. However, which memory fraction Spark will use to load the partitioned lines, compute flatMap() and map()? Thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Which-memory-fraction-is-Spark-using-to-compute-RDDs-that-are-not-going-to-be-persisted-tp23942.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Does Spark streaming support is there with RabbitMQ
Hi, We tested this receiver internally in stratio sparkta, and it works fine, If you will try the receiver, we're open to your collaboration, your issues will be wellcome. Regards A.Rincón Stratio software architect 2015-07-22 8:15 GMT+02:00 Tathagata Das t...@databricks.com: You could contact the authors of the spark-packages.. maybe that will help? On Mon, Jul 20, 2015 at 6:41 AM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks Todd, I m not sure whether somebody has used it or not. can somebody confirm if this integrate nicely with Spark streaming? On 20 July 2015 at 18:43, Todd Nist tsind...@gmail.com wrote: There is one package available on the spark-packages site, http://spark-packages.org/package/Stratio/RabbitMQ-Receiver The source is here: https://github.com/Stratio/RabbitMQ-Receiver Not sure that meets your needs or not. -Todd On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com wrote: Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want to process them using Apache Spark streaming does it scale? Regards Jeetendra
Re: many-to-many join
If I understand this correctly, you could join area_code_user and area_code_state and then flat map to get user, areacode, state. Then groupby/reduce by user. You can also try some join optimizations like partitioning on area code or broadcasting smaller table depending on size of area_code_state. On Jul 22, 2015 10:15 AM, John Berryman jo...@eventbrite.com wrote: Quick example problem that's stumping me: * Users have 1 or more phone numbers and therefore one or more area codes. * There are 100M users. * States have one or more area codes. * I would like to the states for the users (as indicated by phone area code). I was thinking about something like this: If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567]) and area_code_state looks like (area_code,state) ex: (615, [Tennessee]) then we could do states_and_users_mixed = area_code_user.join(area_code_state) \ .reduceByKey(lambda a,b: a+b) \ .values() user_state_pairs = states_and_users_mixed.flatMap( emit_cartesian_prod_of_userids_and_states ) user_to_states = user_state_pairs.reduceByKey(lambda a,b: a+b) user_to_states.first(1) (1234567,[Tennessee,Tennessee,California]) This would work, but the user_state_pairs is just a list of user_ids and state names mixed together and emit_cartesian_prod_of_userids_and_states has to correctly pair them. This is problematic because 1) it's weird and sloppy and 2) there will be lots of users per state and having so many users in a single row is going to make emit_cartesian_prod_of_userids_and_states work extra hard to first locate states and then emit all userid-state pairs. How should I be doing this? Thanks, -John
Re: Spark spark.shuffle.memoryFraction has no affect
Hi, The setting of 0.2 / 0.6 looks reasonable to me. Since you are not using caching at all, have you tried trying something more extreme, like 0.1 / 0.9? Since disabling spark.shuffle.spill didn't cause an OOM this setting should be fine. Also, one thing you could do is to verify the shuffle bytes spilled on the UI before and after the change. Let me know if that helped. -Andrew 2015-07-21 13:50 GMT-07:00 wdbaruni wdbar...@gmail.com: Hi I am testing Spark on Amazon EMR using Python and the basic wordcount example shipped with Spark. After running the application, I realized that in Stage 0 reduceByKey(add), around 2.5GB shuffle is spilled to memory and 4GB shuffle is spilled to disk. Since in the wordcount example I am not caching or persisting any data, so I thought I can increase the performance of this application by giving more shuffle memoryFraction. So, in spark-defaults.conf, I added the following: spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.6 However, I am still getting the same performance and the same amount of shuffle data is being spilled to disk and memory. I validated that Spark is reading these configurations using Spark UI/Environment and I can see my changes. Moreover, I tried setting spark.shuffle.spill to false and I got the performance I am looking for and all shuffle data was spilled to memory only. So, what am I getting wrong here and why not the extra shuffle memory fraction is not utilized? *My environment:* Amazon EMR with Spark 1.3.1 running using -x argument 1 Master node: m3.xlarge 3 Core nodes: m3.xlarge Application: wordcount.py Input: 10 .gz files 90MB each (~350MB unarchived) stored in S3 *Submit command:* /home/hadoop/spark/bin/spark-submit --deploy-mode client /mnt/wordcount.py s3n://input location *spark-defaults.conf:* spark.eventLog.enabled false spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 spark.driver.extraJavaOptions -Dspark.driver.log.level=INFO spark.masteryarn spark.executor.instances3 spark.executor.cores4 spark.executor.memory 9404M spark.default.parallelism 12 spark.eventLog.enabled true spark.eventLog.dir hdfs:///spark-logs/ spark.storage.memoryFraction0.2 spark.shuffle.memoryFraction0.6 -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-spark-shuffle-memoryFraction-has-no-affect-tp23944.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to share a Map among RDDS?
Hi Dan, If the map is small enough, you can just broadcast it, can't you? It doesn't have to be an RDD. Here's an example of broadcasting an array and using it on the executors: https://github.com/apache/spark/blob/c03299a18b4e076cabb4b7833a1e7632c5c0dabe/examples/src/main/scala/org/apache/spark/examples/BroadcastTest.scala . -Andrew 2015-07-21 19:56 GMT-07:00 ayan guha guha.a...@gmail.com: Either you have to do rdd.collect and then broadcast or you can do a join On 22 Jul 2015 07:54, Dan Dong dongda...@gmail.com wrote: Hi, All, I am trying to access a Map from RDDs that are on different compute nodes, but without success. The Map is like: val map1 = Map(aa-1,bb-2,cc-3,...) All RDDs will have to check against it to see if the key is in the Map or not, so seems I have to make the Map itself global, the problem is that if the Map is stored as RDDs and spread across the different nodes, each node will only see a piece of the Map and the info will not be complete to check against the Map( an then replace the key with the corresponding value) E,g: val matchs= Vecs.map(term=term.map{case (a,b)=(map1(a),b)}) But if the Map is not an RDD, how to share it like sc.broadcast(map1) Any idea about this? Thanks! Cheers, Dan
Re: How to restart Twitter spark stream
That was a pseudo code, working version would look like this: val stream = TwitterUtils.createStream(ssc, None) val hashTags = stream.flatMap(status = status.getText.split( ).filter(_.startsWith(#))).map(x = (x.toLowerCase,1)) val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) .map{case (topic, count) = (count, topic)} .transform(_.sortByKey(false)).map(x = x._2) topCounts10.print() val filteredStream = topCounts10.transform(rdd ={ *val samplehashtags = ssc.sparkContext.parallelize(Array(#RobinWilliams.toLowerCase,#android.toLowerCase,#iphone.toLowerCase))* val newRDD = samplehashtags.map { x = (x,1) } val joined = newRDD.join(rdd) joined }) filteredStream.print() Thanks Best Regards On Wed, Jul 22, 2015 at 3:58 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Akhil and Jorn, I tried as you suggested to create some simple scenario, but I have an error on rdd.join(newRDD): value join is not a member of org.apache.spark.rdd.RDD[twitter4j.Status]. The code looks like: val stream = TwitterUtils.createStream(ssc, auth) val filteredStream= stream.transform(rdd ={ val samplehashtags=Array(music,film) val newRDD= samplehashtags.map { x = (x,1) } rdd.join(newRDD) }) Did I miss something here? Thanks, Zoran On Mon, Jul 20, 2015 at 9:54 AM, Zoran Jeremic zoran.jere...@gmail.com wrote: Thanks for explanation. If I understand this correctly, in this approach I would actually stream everything from Twitter, and perform filtering in my application using Spark. Isn't this too much overhead if my application is interested in listening for couple of hundreds or thousands hashtags? On one side, this will be better approach since I will not have the problem to open new streams if number of hashtags go over 400 which is the Twitter limit for User stream filtering, but on the other side I'm concern about how much it will affect application performance if I stream everything that is posted on Twitter and filter it locally. It would be great if somebody with experience on this could comment on these concerns. Thanks, Zoran On Mon, Jul 20, 2015 at 12:19 AM, Akhil Das ak...@sigmoidanalytics.com wrote: Jorn meant something like this: val filteredStream = twitterStream.transform(rdd ={ val newRDD = scc.sc.textFile(/this/file/will/be/updated/frequently).map(x = (x,1)) rdd.join(newRDD) }) newRDD will work like a filter when you do the join. Thanks Best Regards On Sun, Jul 19, 2015 at 9:32 PM, Zoran Jeremic zoran.jere...@gmail.com wrote: Hi Jorn, I didn't know that it is possible to change filter without re-opening twitter stream. Actually, I already had that question earlier at the stackoverflow http://stackoverflow.com/questions/30960984/apache-spark-twitter-streaming and I got the answer that it's not possible, but it would be even better if there is some other way to add new hashtags or to remove old hashtags that user stopped following. I guess the second request would be more difficult. However, it would be great if you can give me some short example how to make this. I didn't understand well from your explanation what you mean by join it with a rdd loading the newest hash tags from disk in a regular interval. Thanks, Zoran On Sun, Jul 19, 2015 at 5:01 AM, Jörn Franke jornfra...@gmail.com wrote: Why do you even want to stop it? You can join it with a rdd loading the newest hash tags from disk in a regular interval Le dim. 19 juil. 2015 à 7:40, Zoran Jeremic zoran.jere...@gmail.com a écrit : Hi, I have a twitter spark stream initialized in the following way: val ssc:StreamingContext = SparkLauncher.getSparkScalaStreamingContext() val config = getTwitterConfigurationBuilder.build() val auth: Option[twitter4j.auth.Authorization] = Some(new twitter4j.auth.OAuthAuthorization(config)) val stream = TwitterUtils.createStream(ssc, auth, filters) This works fine when I initialy start it. However, at some point I need to update filters since users might add new hashtags they want to follow. I tried to stop the running stream and spark streaming context without stoping spark context, e.g: stream.stop() ssc.stop(false) Afterward, I'm trying to initialize a new Twitter stream like I did previously. However, I got this exception: Exception in thread Firestorm JMX Monitor java.lang.IllegalStateException: Adding new inputs, transformations, and output operations after stopping a context is not supported at org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:224) at org.apache.spark.streaming.dstream.DStream.init(DStream.scala:64) at org.apache.spark.streaming.dstream.InputDStream.init(InputDStream.scala:41) at
Mesos + Spark
Hi guys! I'm a new in mesos. I have two spark application (one streaming and one batch). I want to run both app in mesos cluster. Now for testing I want to run in docker container so I started a simple redjack/mesos-master, but I think a lot of think unclear for me (both mesos and spark-mesos). If I have a mesos cluster (for testing it will be some docker container) i need a separate machine (container) to run my spark job? Or can I submit the cluster and schedule (chronos or I dunno)? How can I run the streaming job? What happened if the controller died? Or if I call spark-submit with master=mesos my application started and I can forget? How can I run in every 10 min without submit in every 10 min? How can I run my streaming app in HA mode? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: user threads in executors
Yes, you could unroll from the iterator in batch of 100-200 and then post them in multiple rounds. If you are using the Kafka receiver based approach (not Direct), then the raw Kafka data is stored in the executor memory. If you are using Direct Kafka, then it is read from Kafka directly at the time of filtering. TD On Tue, Jul 21, 2015 at 9:34 PM, Shushant Arora shushantaror...@gmail.com wrote: I can post multiple items at a time. Data is being read from kafka and filtered after that its posted . Does foreachPartition load complete partition in memory or use an iterator of batch underhood? If compete batch is not loaded will using custim size of 100-200 request in one batch and post will help instead of whole partition ? On Wed, Jul 22, 2015 at 12:18 AM, Tathagata Das t...@databricks.com wrote: If you can post multiple items at a time, then use foreachPartition to post the whole partition in a single request. On Tue, Jul 21, 2015 at 9:35 AM, Richard Marscher rmarsc...@localytics.com wrote: You can certainly create threads in a map transformation. We do this to do concurrent DB lookups during one stage for example. I would recommend, however, that you switch to mapPartitions from map as this allows you to create a fixed size thread pool to share across items on a partition as opposed to spawning a future per record in the RDD for example. On Tue, Jul 21, 2015 at 4:11 AM, Shushant Arora shushantaror...@gmail.com wrote: Hi Can I create user threads in executors. I have a streaming app where after processing I have a requirement to push events to external system . Each post request costs ~90-100 ms. To make post parllel, I can not use same thread because that is limited by no of cores available in system , can I useuser therads in spark App? I tried to create 2 thredas in a map tasks and it worked. Is there any upper limit on no of user threds in spark executor ? Is it a good idea to create user threads in spark map task? Thanks -- *Richard Marscher* Software Engineer Localytics Localytics.com http://localytics.com/ | Our Blog http://localytics.com/blog | Twitter http://twitter.com/localytics | Facebook http://facebook.com/localytics | LinkedIn http://www.linkedin.com/company/1148792?trk=tyah
Re: spark streaming 1.3 issues
For Java, do OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd*.rdd()*).offsetRanges(); If you fix that error, you should be seeing data. You can call arbitrary RDD operations on a DStream, using DStream.transform. Take a look at the docs. For the direct kafka approach you are doing, - tasks do get launched for empty partitions - driver may make multiple calls to Kafka brokers to get all the offset information. But that does not mean you should reduce partitions. the whole point of having large number of partition is the consume the data in parallel. If you reduce the number of partitions, that defeats the purpose of having partitoins at all. And the driver making calls for getting metadata (i.e. offsets) isnt very costly, nor is it a bottleneck usually. Rather receiving and processing the actual data is usually the bottleneck and to increase throughput you should have larger number of partitions. On Tue, Jul 21, 2015 at 1:02 AM, Akhil Das ak...@sigmoidanalytics.com wrote: I'd suggest you upgrading to 1.4 as it has better metrices and UI. Thanks Best Regards On Mon, Jul 20, 2015 at 7:01 PM, Shushant Arora shushantaror...@gmail.com wrote: Is coalesce not applicable to kafkaStream ? How to do coalesce on kafkadirectstream its not there in api ? Shall calling repartition on directstream with number of executors as numpartitions will imrove perfromance ? Does in 1.3 tasks get launched for partitions which are empty? Does driver makes call for getting offsets of each partition separately or in single call it gets all partitions new offsets ? I mean will reducing no of partitions oin kafka help improving the performance? On Mon, Jul 20, 2015 at 4:52 PM, Shushant Arora shushantaror...@gmail.com wrote: Hi 1.I am using spark streaming 1.3 for reading from a kafka queue and pushing events to external source. I passed in my job 20 executors but it is showing only 6 in executor tab ? When I used highlevel streaming 1.2 - its showing 20 executors. My cluster is 10 node yarn cluster with each node has 8 cores. I am calling the script as : spark-submit --class classname --num-executors 10 --executor-cores 2 --master yarn-client jarfile 2. On Streaming UI Started at: Mon Jul 20 11:02:10 GMT+00:00 2015 Time since start: 13 minutes 28 seconds Network receivers: 0 Batch interval: 1 second Processed batches: 807 Waiting batches: 0 Received records: 0 Processed records: 0 Received records and processed records are always 0 . And Speed of processing is slow compare to highlevel api. I am procesing the stream using mapPartition. When I used directKafkaStream.foreachRDD(new FunctionJavaPairRDDbyte[],byte[], Void() { @Override public Void call(JavaPairRDDbyte[], byte[] rdd) throws Exception { // TODO Auto-generated method stub OffsetRange[] offsetRanges = ((HasOffsetRanges)rdd).offsetRanges(); } } It throws an exception java.lang.ClassCastException: org.apache.spark.api.java.JavaPairRDD cannot be cast to org.apache.spark.streaming.kafka.HasOffsetRanges Thanks Shushant
Re: spark streaming 1.3 coalesce on kafkadirectstream
With DirectKafkaStream there are two approaches. 1. you increase the number of KAfka partitions Spark will automatically read in parallel 2. if that's not possible, then explicitly repartition only if there are more cores in the cluster than the number of Kafka partitions, AND the first map-like state on the directKafkaDStream is heavy enough to warrant the cost of repartitioning (shuffles the data around). On Mon, Jul 20, 2015 at 8:31 PM, Shushant Arora shushantaror...@gmail.com wrote: does spark streaming 1.3 launches task for each partition offset range whether that is 0 or not ? If yes, how can I enforce it to not to launch tasks for empty rdds.Not able t o use coalesce on directKafkaStream. Shall we enforce repartitioning always before processing direct stream ? use case is : directKafkaStream.repartition(numexecutors).mapPartitions(new FlatMapFunctionIteratorTuple2byte[],byte[], String(){ ... } Thanks
Re: Does Spark streaming support is there with RabbitMQ
You could contact the authors of the spark-packages.. maybe that will help? On Mon, Jul 20, 2015 at 6:41 AM, Jeetendra Gangele gangele...@gmail.com wrote: Thanks Todd, I m not sure whether somebody has used it or not. can somebody confirm if this integrate nicely with Spark streaming? On 20 July 2015 at 18:43, Todd Nist tsind...@gmail.com wrote: There is one package available on the spark-packages site, http://spark-packages.org/package/Stratio/RabbitMQ-Receiver The source is here: https://github.com/Stratio/RabbitMQ-Receiver Not sure that meets your needs or not. -Todd On Mon, Jul 20, 2015 at 8:52 AM, Jeetendra Gangele gangele...@gmail.com wrote: Does Apache spark support RabbitMQ. I have messages on RabbitMQ and I want to process them using Apache Spark streaming does it scale? Regards Jeetendra
Re: Spark-hive parquet schema evolution
Since Hive doesn’t support schema evolution, you’ll have to update the schema stored in metastore somehow. For example, you can create a new external table with the merged schema. Say you have a Hive table |t1|: |CREATE TABLE t1 (c0 INT, c1 DOUBLE); | By default, this table is stored in HDFS path |hdfs://some-host:9000/user/hive/warehouse/t1|. Now you append some Parquet data with an extra column |c2| to the same directory: |import org.apache.spark.sql.types._ val path = hdfs://some-host:9000/user/hive/warehouse/t1 val df1 = sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast StringType as 'c2) df1.write.mode(append).parquet(path) | Now you can create a new external table |t2| like this: |val df2 = sqlContext.read.option(mergeSchema, true).parquet(path) df2.write.path(path).saveAsTable(t2) | Since we specified a path above, the newly created |t2| is an external table pointing to the original HDFS location. But the schema of |t2| is the merged version. The drawback of this approach is that, |t2| is actually a Spark SQL specific data source table rather than a genuine Hive table. This means, it can be accessed by Spark SQL only. We’re just using Hive metastore to help persisting metadata of the data source table. However, since you’re asking how to access the new table via Spark SQL CLI, this should work for you. We are working on making Parquet and ORC data source tables accessible via Hive in Spark 1.5.0. Cheng On 7/22/15 10:32 AM, Jerrick Hoang wrote: Hi Lian, Sorry I'm new to Spark so I did not express myself very clearly. I'm concerned about the situation when let's say I have a Parquet table some partitions and I add a new column A to parquet schema and write some data with the new schema to a new partition in the table. If i'm not mistaken, if I do a sqlContext.read.parquet(table_path).printSchema() it will print the correct schema with new column A. But if I do a 'describe table' from SparkSQLCLI I won't see the new column being added. I understand that this is because Hive doesn't support schema evolution. So what is the best way to support CLI queries in this situation? Do I need to manually alter the table everytime the underlying schema changes? Thanks On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com mailto:lian.cs@gmail.com wrote: Hey Jerrick, What do you mean by schema evolution with Hive metastore tables? Hive doesn't take schema evolution into account. Could you please give a concrete use case? Are you trying to write Parquet data with extra columns into an existing metastore Parquet table? Cheng On 7/21/15 1:04 AM, Jerrick Hoang wrote: I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.com mailto:jerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Re: many-to-many join
Hi RDD solution: u = [(615,1),(720,1),(615,2)] urdd=sc.parallelize(u,1) a1 = [(615,'T'),(720,'C')] ardd=sc.parallelize(a1,1) def addString(s1,s2): ... return s1+','+s2 j = urdd.join(ardd).map(lambda t:t[1]).reduceByKey(addString) print j.collect() [(2, 'T'), (1, 'C,T')] However, if you can assume number of users is far far greater than number of distinct area codes, you may think to broadcast variable in a dict format and look up in the map. Like this u = [(1,615),(1,720),(2,615)] a = {615:'T',720:'C'} urdd=sc.parallelize(u) def usr_area_state(tup): ... uid=tup[0] ... aid=tup[1] ... sid=bc.value[aid] ... return uid,(sid,) ... bc=sc.broadcast(a) usrdd=urdd.map(usr_area_state) def addTuple(t1,t2): ... return t1+t2 ... out=usrdd.reduceByKey(addTuple) print out.collect() [(1, ('T', 'C')), (2, ('T',))] Best Ayan On Wed, Jul 22, 2015 at 5:14 PM, Sonal Goyal sonalgoy...@gmail.com wrote: If I understand this correctly, you could join area_code_user and area_code_state and then flat map to get user, areacode, state. Then groupby/reduce by user. You can also try some join optimizations like partitioning on area code or broadcasting smaller table depending on size of area_code_state. On Jul 22, 2015 10:15 AM, John Berryman jo...@eventbrite.com wrote: Quick example problem that's stumping me: * Users have 1 or more phone numbers and therefore one or more area codes. * There are 100M users. * States have one or more area codes. * I would like to the states for the users (as indicated by phone area code). I was thinking about something like this: If area_code_user looks like (area_code,[user_id]) ex: (615,[1234567]) and area_code_state looks like (area_code,state) ex: (615, [Tennessee]) then we could do states_and_users_mixed = area_code_user.join(area_code_state) \ .reduceByKey(lambda a,b: a+b) \ .values() user_state_pairs = states_and_users_mixed.flatMap( emit_cartesian_prod_of_userids_and_states ) user_to_states = user_state_pairs.reduceByKey(lambda a,b: a+b) user_to_states.first(1) (1234567,[Tennessee,Tennessee,California]) This would work, but the user_state_pairs is just a list of user_ids and state names mixed together and emit_cartesian_prod_of_userids_and_states has to correctly pair them. This is problematic because 1) it's weird and sloppy and 2) there will be lots of users per state and having so many users in a single row is going to make emit_cartesian_prod_of_userids_and_states work extra hard to first locate states and then emit all userid-state pairs. How should I be doing this? Thanks, -John -- Best Regards, Ayan Guha
Scaling spark cluster for a running application
I have a spark cluster running in client mode with driver outside the spark cluster. I want to scale the cluster after an application is submitted. In order to do this, I'm creating new workers and they are getting registered with master but issue I'm seeing is; running application does not use the newly added worker. Hence cannot add more resources to existing running application. Is there any other way or config to deal with this use-case ? How to make running application to ask for executors from newly issued worker node ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Is spark suitable for real time query
Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Proper saving/loading of MatrixFactorizationModel
Hi all! I have MatrixFactorizationModel object. If I'm trying to recommend products to single user right after constructing model through ALS.train(...) then it takes 300ms (for my data and hardware). But if I save model to disk and load it back then recommendation takes almost 2000ms. Also Spark warns: 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor does not have a partitioner. Prediction on individual records could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: User factor is not cached. Prediction could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor does not have a partitioner. Prediction on individual records could be slow. 15/07/17 11:05:47 WARN MatrixFactorizationModel: Product factor is not cached. Prediction could be slow. How can I create/set partitioner and cache user and product factors after loading model? Following approach didn't help: model.userFeatures().cache(); model.productFeatures().cache(); Also I was trying to repartition those rdds and create new model from repartitioned versions but that also didn't help. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Proper-saving-loading-of-MatrixFactorizationModel-tp23952.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Is spark suitable for real time query
Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
Re: Broadcast variables in R
Thank you very much Shivaram. I’ve got it working on Mac now by specifying the namespace. Using SparkR:::parallelize() iso just parallelize() Wkr, Serge On 21 Jul 2015, at 17:20, Shivaram Venkataraman shiva...@eecs.berkeley.edumailto:shiva...@eecs.berkeley.edu wrote: There shouldn't be anything Mac OS specific about this feature. One point of warning though -- As mentioned previously in this thread the APIs were made private because we aren't sure we will be supporting them in the future. If you are using these APIs it would be good to chime in on the JIRA with your use-case Thanks Shivaram On Tue, Jul 21, 2015 at 2:34 AM, Serge Franchois serge.franch...@altran.commailto:serge.franch...@altran.com wrote: I might add to this that I've done the same exercise on Linux (CentOS 6) and there, broadcast variables ARE working. Is this functionality perhaps not exposed on Mac OS X? Or has it to do with the fact there are no native Hadoop libs for Mac? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variables-in-R-tp23914p23927.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Is spark suitable for real time query
I do a simple test using spark in standalone mode(not cluster), and found a simple action take a few seconds, the data size is small, just few rows. So each spark job will cost some time for init or prepare work no matter what the job is? I mean if the basic framework of spark job will cost seconds? 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk: Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
Need help in SparkSQL
HI All, I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex queries analysis on this data.Queries like AND queries involved multiple fields So my question in which which format I should store the data in HDFS so that processing will be fast for such kind of queries? Regards Jeetendra
Re: How to build Spark with my own version of Hadoop?
As you know, the hadoop versions and so on are available in the spark build files, iirc the top level pox.xml has all the maven variables for versions. So I think if you just build hadoop locally (i.e. build it as it to 2.2.1234-SNAPSHOT and mvn install it), you should be able to change the corresponding varaible in the top level spark pom.xml. . Of course this is a pandoras box where now you need to also deploy your custom YARN on your cluster, make sure it matches the spark target, and so on (if your running spark on YARN). RPMs and DEB packages tend to be useful for this kind of thing, since you can easily sync the /etc/ config files and uniformly manage/upgrade versions etc. ... Thus... if your really serious about building a custom distribution, mixing matching hadoop components separately, you might want to consider using Apache BigTop, just bring this up on that mailing list... We curate a hadoop distribution builder that builds spark, hadoop, hive, ignite, kafka, zookeeper, hbase and so on... Since bigtop has all the tooling necessary to fully build, test, and deploy on VMs/containers your hadoop bits, it might make your life a little easier. On Tue, Jul 21, 2015 at 11:11 PM, Dogtail Ray spark.ru...@gmail.com wrote: Hi, I have modified some Hadoop code, and want to build Spark with the modified version of Hadoop. Do I need to change the compilation dependency files? How to then? Great thanks! -- jay vyas
Re: Is spark suitable for real time query
you can use spark rest job server(or any other solution that provides long running spark context) so that you won't pay this bootstrap time on each query in addition : if you have some rdd that u want your queries to be executed on, you can cache this rdd in memory(depends on ur cluster memory size) so that you wont pay reading from disk time On 22 July 2015 at 14:46, Louis Hust louis.h...@gmail.com wrote: I do a simple test using spark in standalone mode(not cluster), and found a simple action take a few seconds, the data size is small, just few rows. So each spark job will cost some time for init or prepare work no matter what the job is? I mean if the basic framework of spark job will cost seconds? 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk: Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null
Hello! I don't understand why, but I can't read data from my parquet file. I made parquet file from json file and read it to data frame: /df.printSchema() |-- param: struct (nullable = true) ||-- FORM: string (nullable = true) ||-- URL: string (nullable = true)/ /When I try to read any record I get an error: df.select(param).first() 15/07/22 13:06:15 ERROR Executor: Exception in task 0.0 in stage 8.0 (TID 4) java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132) at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106) at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96) at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89) at parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189) at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: java.lang.IllegalArgumentException: expected one of [REQUIRED, OPTIONAL, REPEATED] got utm_medium at line 29: optional binary amp;utm_medium at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:203) at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:101) at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96) at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:130) ... 24 more Caused by: java.lang.IllegalArgumentException: No enum constant parquet.schema.Type.Repetition.UTM_MEDIUM at java.lang.Enum.valueOf(Enum.java:238) at parquet.schema.Type$Repetition.valueOf(Type.java:70) at parquet.schema.MessageTypeParser.asRepetition(MessageTypeParser.java:201) ... 27 more 15/07/22 13:06:15 WARN TaskSetManager: Lost task 0.0 in stage 8.0 (TID 4, localhost): java.lang.IllegalArgumentException: problem reading type: type = group, name = param, original type = null at parquet.schema.MessageTypeParser.addGroupType(MessageTypeParser.java:132) at parquet.schema.MessageTypeParser.addType(MessageTypeParser.java:106) at parquet.schema.MessageTypeParser.addGroupTypeFields(MessageTypeParser.java:96) at parquet.schema.MessageTypeParser.parse(MessageTypeParser.java:89) at parquet.schema.MessageTypeParser.parseMessageType(MessageTypeParser.java:79) at parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:189) at parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:138) at org.apache.spark.sql.sources.SqlNewHadoopRDD$$anon$1.init(SqlNewHadoopRDD.scala:153) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:124) at org.apache.spark.sql.sources.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:66) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at
No suitable driver found for jdbc:mysql://
Hi All, I have a cluster with spark 1.4. I am trying to save data to mysql but getting error Exception in thread main java.sql.SQLException: No suitable driver found for jdbc:mysql://.rds.amazonaws.com:3306/DAE_kmer?user=password= *I looked at - https://issues.apache.org/jira/browse/SPARK-8463 https://issues.apache.org/jira/browse/SPARK-8463 and added the connector jar to the same location as on Master using copy-dir script.* *But I am still getting the same error. This sued to work with 1.3.* *This is my command to run the program - **$SPARK_HOME/bin/spark-submit --jars /root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar --conf spark.executor.extraClassPath=/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar --conf spark.executor.memory=55g --driver-memory=55g --master=spark://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077 http://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077 --class saveBedToDB target/scala-2.10/adam-project_2.10-1.0.jar* *What else can I Do ?* *Thanks* *-Roni*
Re: Need help in SparkSQL
Can you provide an example of an and query ? If you do just look-up you should try Hbase/ phoenix, otherwise you can try orc with storage index and/or compression, but this depends on how your queries look like Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele gangele...@gmail.com a écrit : HI All, I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex queries analysis on this data.Queries like AND queries involved multiple fields So my question in which which format I should store the data in HDFS so that processing will be fast for such kind of queries? Regards Jeetendra
Re: Performance issue with Spak's foreachpartition method
Thanks Robin for your reply. I'm pretty sure that writing to Oracle is taking longer as when writing to HDFS is only taking ~5minutes. The job is writing about ~5 Million of records. I've set the job to call executeBatch() when the batchSize reaches 200,000 of records, so I assume that commit will be invoked at every 200K batch. In this case, it should only call commit 25 times, is this too much? I wouldn't want to increase the batch size any further as it may cause Java heap issue. I do not have much knowledge in Oracle side, so any advice with the configuration will be grateful. Thanks, Raj On 22 July 2015 at 20:20, Robin East robin.e...@xense.co.uk wrote: The first question I would ask is have you determined whether you have a performance issue writing to Oracle? In particular how many commits are you making? If you are issuing a lot of commits that would be a performance problem. Robin On 22 Jul 2015, at 19:11, diplomatic Guru diplomaticg...@gmail.com wrote: Hello all, We are having a major performance issue with the Spark, which is holding us from going live. We have a job that carries out computation on log files and write the results into Oracle DB. The reducer 'reduceByKey' have been set to parallelize by 4 as we don't want to establish too many DB connections. We are then calling the foreachPartition on the RDD pairs that were reduced by the key. Within this foreachPartition method we establish DB connection, then iterate the results, prepare the Oracle statement for batch insertion then we commit the batch and close the connection. All these are working fine. However, when we execute the job to process 12GB of data, it takes forever to complete, especially at the foreachPartition stage. We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned to spark.storage.memoryFraction. The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how we could enhance the performance. I've provided the main body of the codes, please take a look and advice: From Driver: reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize)); DB class: public class DB { private static final Logger logger = LoggerFactory .getLogger(DB.class); public static class InsertFunction implements VoidFunctionIteratorTuple2String, String { private static final long serialVersionUID = 55766876878L; private String dbuser = ; private String dbpass = ; private int batchsize; public InsertFunction(String dbuser, String dbpass, int batchsize) { super(); this.dbuser = dbuser; this.dbuser = dbuser; this.batchsize=batchsize; } @Override public void call(IteratorTuple2String, String results) { Connection connect = null; PreparedStatement pstmt = null; try { connect = getDBConnection(dbuser, dbpass); int count = 0; if (batchsize = 0) { batchsize = 1; } pstmt1 = connect .prepareStatement(MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT); while (results.hasNext()) { Tuple2String, String kv = results.next(); String [] data = kv._1.concat(, +kv._2).split(,); pstmt.setString(1, data[0].toString()); pstmt.setString(2, data[1].toString()); . pstmt.addBatch(); count++; if (count == batchsize) { logger.info(BulkCount : + count); pstmt.executeBatch(); connect.commit(); count = 0; } pstmt.executeBatch(); connect.commit(); } pstmt.executeBatch(); connect.commit(); } catch (Exception e) { logger.error(InsertFunction error: + e.getMessage()); } finally { if (pstmt != null) { pstmt.close(); } try { connect.close(); } catch (SQLException e) { logger.error(InsertFunction Connection Close error: + e.getMessage()); } } } } }
ShuffledHashJoin instead of CartesianProduct
Hello, I'm trying to link records from two large data sources. Both datasets have almost same number of rows. Goal is to match records based on multiple columns. val matchId = SFAccountDF.as(SF).join(ELAccountDF.as(EL)).where($SF.Email === $EL.EmailAddress || $SF.Phone === EL.Phone) Joining with a OR(||) will result in a CartesianProduct. I'm trying to avoid that. One way to do this is to join on each column and UNION the results. val phoneMatch = SFAccountDF.as(SF).filter(Phone != '').join(ELAccountDF.as(EL).filter(BusinessPhone != '')).where($SF.Phone === $EL.BusinessPhone) val emailMatch = SFAccountDF.as(SF).filter(Email != '').join(ELAccountDF.as(EL).filter(EmailAddress != '')).where($SF.Email === $EL.EmailAddress) val matchId = phoneMatch.unionAll(emailMatch.unionAll(faxMatch.unionAll(mobileMatch))) matchId.cache().registerTempTable(matchId) Is there a more elegant way to do this? On a related note, has anyone worked on record linkage using Bloom Filters, Levenshtein distance, etc in Spark? Srikanth
What if request cores are not satisfied
Hi, Assume a following scenario: The spark standalone cluster has 10 cores in total, I have an application that will request 12 cores. Will the application run with fewer cores than requested or will it simply wait for ever since there are only 10 cores available. I would guess it will be run with fewer cores, but I didn't get a chance to try/test it. Thanks. bit1...@163.com
Hive Session gets overwritten in ClientWrapper
I'm currently using Spark 1.4 in standalone mode. I've forked the Apache Hive branch from https://github.com/pwendell/hive https://github.com/pwendell/hive and customised in the following way. Added a thread local variable in SessionManager class. And I'm setting the session variable in my Custom Authenticator class. For achieving the above, I've built the necessary jars(hive-common-0.13.1c.jar, hive-exec-0.13.1c.jar, hive-metastore-0.13.1c.jar, hive-serde-0.13.1c.jar, hive-service-0.13.1c.jar) from https://github.com/pwendell/hive and added to Spark's classpath. The above feature works in Spark 1.3.1, but is broken in Spark 1.4. When I looked into it, I found out that the ClientWrapper class is creating a new Session State and using it thereafter. As a result I'm not able to retrieve the info which i had stored earlier in the session. Also I'm not able to retrieve a value from hiveconf which was set earlier. When i looked into the source code for ClientWrapper.scala, i found the following. // Create an internal session state for this ClientWrapper. val state = { val original = Thread.currentThread().getContextClassLoader // Switch to the initClassLoader. Thread.currentThread().setContextClassLoader(initClassLoader) From what i can understand, the above tries to use the existing Hive session, else it creates it's own session. Am I right? If so, is there a bug causing the ClientWrapper not to use the existing session. Or should I implement my requirement in a different way? My requirement is to have a custom session variable and use it throughout the session. My usage is as folows: To set the value /SessionManager.setSessionVar(value);/ To retrieve the value /SessionManager.getSessionVar();/ To set a hiveconf /hiveConf.set(conf, ConfVars.VAR, val);/ to Retrieve /hiveConf.get(ConfVars.VAR); SessionState.get().getConf().getVar(ConfVars.VAR)/ -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Hive-Session-gets-overwritten-in-ClientWrapper-tp23962.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Need help in SparkSQL
I do not think you can put all your queries into the row key without duplicating the data for each query. However, this would be more last resort. Have you checked out phoenix for Hbase? This might suit your needs. It makes it much simpler, because it provided sql on top of Hbase. Nevertheless, Hive could also be a viable alternative depending on how often you run queries etc Le jeu. 23 juil. 2015 à 7:14, Jeetendra Gangele gangele...@gmail.com a écrit : Query will be something like that 1. how many users visited 1 BHK flat in last 1 hour in given particular area 2. how many visitor for flats in give area 3. list all user who bought given property in last 30 days Further it may go too complex involving multiple parameters in my query. The problem is HBase is designing row key to get this data efficiently. Since I have multiple fields to query upon base may not be a good choice? i dont dont to iterate the result set which Hbase returns and give the result because this will kill the performance? On 23 July 2015 at 01:02, Jörn Franke jornfra...@gmail.com wrote: Can you provide an example of an and query ? If you do just look-up you should try Hbase/ phoenix, otherwise you can try orc with storage index and/or compression, but this depends on how your queries look like Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele gangele...@gmail.com a écrit : HI All, I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex queries analysis on this data.Queries like AND queries involved multiple fields So my question in which which format I should store the data in HDFS so that processing will be fast for such kind of queries? Regards Jeetendra -- Hi, Find my attached resume. I have total around 7 years of work experience. I worked for Amazon and Expedia in my previous assignments and currently I am working with start- up technology company called Insideview in hyderabad. Regards Jeetendra
Re: Need help in setting up spark cluster
Can anybody help here? On 22 July 2015 at 10:38, Jeetendra Gangele gangele...@gmail.com wrote: Hi All, I am trying to capture the user activities for real estate portal. I am using RabbitMS and Spark streaming combination where all the Events I am pushing to RabbitMQ and then 1 secs micro job I am consuming using Spark streaming. Later on I am thinking to store the consumed data for analytics or near real time recommendations. Where should I store this data in Spark RDD itself and using SparkSQL people can query this data for analytics or real time recommendations, this data is not huge currently its 10 GB per day. Another alternatiove will be either Hbase or Cassandra, which one will be better? Any suggestions? Also for this use cases should I use any existing big data platform like hortonworks or I can deploy standalone spark cluster ?
Re: Need help in SparkSQL
Query will be something like that 1. how many users visited 1 BHK flat in last 1 hour in given particular area 2. how many visitor for flats in give area 3. list all user who bought given property in last 30 days Further it may go too complex involving multiple parameters in my query. The problem is HBase is designing row key to get this data efficiently. Since I have multiple fields to query upon base may not be a good choice? i dont dont to iterate the result set which Hbase returns and give the result because this will kill the performance? On 23 July 2015 at 01:02, Jörn Franke jornfra...@gmail.com wrote: Can you provide an example of an and query ? If you do just look-up you should try Hbase/ phoenix, otherwise you can try orc with storage index and/or compression, but this depends on how your queries look like Le mer. 22 juil. 2015 à 14:48, Jeetendra Gangele gangele...@gmail.com a écrit : HI All, I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex queries analysis on this data.Queries like AND queries involved multiple fields So my question in which which format I should store the data in HDFS so that processing will be fast for such kind of queries? Regards Jeetendra -- Hi, Find my attached resume. I have total around 7 years of work experience. I worked for Amazon and Expedia in my previous assignments and currently I am working with start- up technology company called Insideview in hyderabad. Regards Jeetendra
Re: Package Release Annoucement: Spark SQL on HBase Astro
Does it also support insert operations ? On Jul 22, 2015 4:53 PM, Bing Xiao (Bing) bing.x...@huawei.com wrote: We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed “Astro”, include: · Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique · HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing · SQL, Data Frame support · More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) · Joins with data from other sources · Python/Java/Scala support · Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the “Issues” “Pull Requests” links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. “Astro”, the Spark SQL on HBase package will be useful for ultra low latency* query and analytics of large scale data sets in vertical enterprises**.* We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Issue with column named count in a DataFrame
I'm trying to do some simple counting and aggregation in an IPython notebook with Spark 1.4.0 and I have encountered behavior that looks like a bug. When I try to filter rows out of an RDD with a column name of count I get a large error message. I would just avoid naming things count, except for the fact that this is the default column name created with the count() operation in pyspark.sql.GroupedData The small example program below demonstrates the issue. from pyspark.sql import SQLContext sqlContext = SQLContext(sc) dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title]) counts = dataFrame.groupBy('title').count() counts.filter(title = 'foo').show() # Works counts.filter(count 1).show() # Errors out I can even reproduce the issue in a PySpark shell session by entering these commands. I suspect that the error has something to with Spark wanting to call the count() function in place of looking at the count column. The error message is as follows: Py4JJavaError Traceback (most recent call last) ipython-input-29-62a1b7c71f21 in module() 1 counts.filter(count 1).show() # Errors Out C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc in filter(self, condition) 774 775 if isinstance(condition, basestring): -- 776 jdf = self._jdf.filter(condition) 777 elif isinstance(condition, Column): 778 jdf = self._jdf.filter(condition._jc) C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: C:\Python27\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o229.filter. : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found count 1 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45) at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) Is there a recommended workaround to the inability to filter on a column named count? Do I have to make a new DataFrame and rename the column just to work around this bug? What's the best way to do that? Thanks, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Need help in SparkSQL
Parquet Mohammed From: Jeetendra Gangele [mailto:gangele...@gmail.com] Sent: Wednesday, July 22, 2015 5:48 AM To: user Subject: Need help in SparkSQL HI All, I have data in MongoDb(few TBs) which I want to migrate to HDFS to do complex queries analysis on this data.Queries like AND queries involved multiple fields So my question in which which format I should store the data in HDFS so that processing will be fast for such kind of queries? Regards Jeetendra
spark-submit and spark-shell behaviors mismatch.
Hi, I have a simple test spark program as below, the strange thing is that it runs well under a spark-shell, but will get a runtime error of java.lang.NoSuchMethodError: in spark-submit, which indicate the line of: val maps2=maps.collect.toMap has problem. But why the compilation has no problem and it works well under spark-shell(==maps2: scala.collection.immutable.Map[Int,String] = Map(269953 - once, 97 - a, 451002 - upon, 117481 - was, 226916 - there, 414413 - time, 146327 - king) )? Thanks! import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark._ import SparkContext._ val docs=sc.parallelize(Array(Array(once ,upon, a, time), Array(there, was, a, king))) val hashingTF = new HashingTF() val maps=docs.flatMap{term=term.map(ele=(hashingTF.indexOf(ele),ele))} val maps2=maps.collect.toMap Cheers, Dan
Re: How to keep RDDs in memory between two different batch jobs?
Yes. Tachyon can handle this well: http://tachyon-project.org/ Best, Haoyuan On Wed, Jul 22, 2015 at 10:56 AM, swetha swethakasire...@gmail.com wrote: Hi, We have a requirement wherein we need to keep RDDs in memory between Spark batch processing that happens every one hour. The idea here is to have RDDs that have active user sessions in memory between two jobs so that once a job processing is done and another job is run after an hour the RDDs with active sessions are still available for joining with those in the current job. So, what do we need to keep the data in memory in between two batch jobs? Can we use Tachyon? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-RDDs-in-memory-between-two-different-batch-jobs-tp23957.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Haoyuan Li CEO, Tachyon Nexus http://www.tachyonnexus.com/
Using Wurfl in Spark
Hi all, I am trying to do wurfl lookup in a spark cluster and getting exceptions, I am pretty sure that the same thing works in small scale. But it fails when I tried to do it in spark. I used spark-ec2/copy-dir to copy the wurfl library to workers already and launched the spark-shell with parameter —jars including wurfl and its dependencies in the lib/ directory. To reconstruct the error, let’s say that I have a userAgentRdd already, which is RDD[String] and a userAgentSample of Array[String]. I am trying to reuse the wurfl engine by doing mapPartitions so I can save time for reloading it. import net.sourceforge.wurfl.core.GeneralWURFLEngine def lookupModel(wurfl: GeneralWURFLEngine)(userAgent: String) = { val device = wurfl.getDeviceForRequest(userAgent) val brand = device.getCapability(brand_name) val model = device.getCapability(model_name) (brand, model) } def lookupModelPartitions(wurflXmlPath: String)(userAgentIterator: Iterator[String]) = { val wurfl = new GeneralWURFLEngine(wurflXmlPath) wurfl.setEngineTarget(EngineTarget.accuracy) userAgentIterator.map(lookupModel(wurfl)) } // the following will work val wurflEngine = new GeneralWURFLEngine(/root/wurfl-1.6.1.0-release/wurfl.zip) val userAgentSample = // my local dataset val modelSample = userAgentSample.map(lookupModel(wurflEngine)) // the following will also work val userAgentRdd = // my spark dataset val modelRdd = userAgentRdd.mapPartitions(lookupModelPartitions(/root/wurfl-1.6.1.0-release/wurfl.zip”)) modelRdd.take(10) // but the following will not work modelRdd.count org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3.0 (TID 491, 10.128.224.227): net.sourceforge.wurfl.core.exc.WURFLRuntimeException: WURFL unexpected exception at net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:286) at net.sourceforge.wurfl.core.GeneralWURFLEngine.getDeviceForRequest(GeneralWURFLEngine.java:425) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.lookupModel(console:23) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$lookupModelPartitions$1.apply(console:27) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1628) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1099) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1767) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63) at org.apache.spark.scheduler.Task.run(Task.scala:70) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLResourceException: WURFL unexpected exception at net.sourceforge.wurfl.core.resource.XMLResource.readData(XMLResource.java:350) at net.sourceforge.wurfl.core.resource.XMLResource.getData(XMLResource.java:154) at net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:118) at net.sourceforge.wurfl.core.resource.DefaultWURFLModel.init(DefaultWURFLModel.java:110) at net.sourceforge.wurfl.core.GeneralWURFLEngine.init(GeneralWURFLEngine.java:304) at net.sourceforge.wurfl.core.GeneralWURFLEngine.initIfNeeded(GeneralWURFLEngine.java:283) ... 16 more Caused by: net.sourceforge.wurfl.core.resource.exc.WURFLParsingException: The devices with id generic define more is_wireless_device at net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startCapabilityElement(XMLResource.java:680) at net.sourceforge.wurfl.core.resource.XMLResource$WURFLSAXHandler.startElement(XMLResource.java:534) at com.sun.org.apache.xerces.internal.parsers.AbstractSAXParser.startElement(AbstractSAXParser.java:509) at com.sun.org.apache.xerces.internal.parsers.AbstractXMLDocumentParser.emptyElement(AbstractXMLDocumentParser.java:182) at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl.scanStartElement(XMLDocumentFragmentScannerImpl.java:1343) at com.sun.org.apache.xerces.internal.impl.XMLDocumentFragmentScannerImpl$FragmentContentDriver.next(XMLDocumentFragmentScannerImpl.java:2786) at com.sun.org.apache.xerces.internal.impl.XMLDocumentScannerImpl.next(XMLDocumentScannerImpl.java:606) at
Package Release Annoucement: Spark SQL on HBase Astro
We are happy to announce the availability of the Spark SQL on HBase 1.0.0 release. http://spark-packages.org/package/Huawei-Spark/Spark-SQL-on-HBase The main features in this package, dubbed Astro, include: * Systematic and powerful handling of data pruning and intelligent scan, based on partial evaluation technique * HBase pushdown capabilities like custom filters and coprocessor to support ultra low latency processing * SQL, Data Frame support * More SQL capabilities made possible (Secondary index, bloom filter, Primary Key, Bulk load, Update) * Joins with data from other sources * Python/Java/Scala support * Support latest Spark 1.4.0 release The tests by Huawei team and community contributors covered the areas: bulk load; projection pruning; partition pruning; partial evaluation; code generation; coprocessor; customer filtering; DML; complex filtering on keys and non-keys; Join/union with non-Hbase data; Data Frame; multi-column family test. We will post the test results including performance tests the middle of August. You are very welcomed to try out or deploy the package, and help improve the integration tests with various combinations of the settings, extensive Data Frame tests, complex join/union test and extensive performance tests. Please use the Issues Pull Requests links at this package homepage, if you want to report bugs, improvement or feature requests. Special thanks to project owner and technical leader Yan Zhou, Huawei global team, community contributors and Databricks. Databricks has been providing great assistance from the design to the release. Astro, the Spark SQL on HBase package will be useful for ultra low latency query and analytics of large scale data sets in vertical enterprises. We will continue to work with the community to develop new features and improve code base. Your comments and suggestions are greatly appreciated. Yan Zhou / Bing Xiao Huawei Big Data team
Re: No suitable driver found for jdbc:mysql://
try setting --driver-class-path On Wed, Jul 22, 2015 at 3:45 PM, roni roni.epi...@gmail.com wrote: Hi All, I have a cluster with spark 1.4. I am trying to save data to mysql but getting error Exception in thread main java.sql.SQLException: No suitable driver found for jdbc:mysql://.rds.amazonaws.com:3306/DAE_kmer?user=password= *I looked at - https://issues.apache.org/jira/browse/SPARK-8463 https://issues.apache.org/jira/browse/SPARK-8463 and added the connector jar to the same location as on Master using copy-dir script.* *But I am still getting the same error. This sued to work with 1.3.* *This is my command to run the program - **$SPARK_HOME/bin/spark-submit --jars /root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar --conf spark.executor.extraClassPath=/root/spark/lib/mysql-connector-java-5.1.35/mysql-connector-java-5.1.35-bin.jar --conf spark.executor.memory=55g --driver-memory=55g --master=spark://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077 http://ec2-52-25-191-999.us-west-2.compute.amazonaws.com:7077 --class saveBedToDB target/scala-2.10/adam-project_2.10-1.0.jar* *What else can I Do ?* *Thanks* *-Roni*
Re: spark-submit and spark-shell behaviors mismatch.
Is it complaining about collect or toMap? In either case this error is indicative of an old version usually -- any chance you have an old installation of Spark somehow? Or scala? You can try running spark-submit with --verbose. Also, when you say it runs with spark-shell do you run spark shell in local mode or with --master? I'd try with --master whatever master you use for spark-submit Also, if you're using standalone mode I believe the worker log contains the launch command for the executor -- you probably want to examine that classpath carefully On Wed, Jul 22, 2015 at 5:25 PM, Dan Dong dongda...@gmail.com wrote: Hi, I have a simple test spark program as below, the strange thing is that it runs well under a spark-shell, but will get a runtime error of java.lang.NoSuchMethodError: in spark-submit, which indicate the line of: val maps2=maps.collect.toMap has problem. But why the compilation has no problem and it works well under spark-shell(==maps2: scala.collection.immutable.Map[Int,String] = Map(269953 - once, 97 - a, 451002 - upon, 117481 - was, 226916 - there, 414413 - time, 146327 - king) )? Thanks! import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.spark.mllib.feature.HashingTF import org.apache.spark.mllib.linalg.Vector import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext import org.apache.spark._ import SparkContext._ val docs=sc.parallelize(Array(Array(once ,upon, a, time), Array(there, was, a, king))) val hashingTF = new HashingTF() val maps=docs.flatMap{term=term.map(ele=(hashingTF.indexOf(ele),ele))} val maps2=maps.collect.toMap Cheers, Dan
Re: Spark SQL Table Caching
I would be interested in the answer to this question, plus the relationship between those and registerTempTable() Pedro On Tue, Jul 21, 2015 at 1:59 PM, Brandon White bwwintheho...@gmail.com wrote: A few questions about caching a table in Spark SQL. 1) Is there any difference between caching the dataframe and the table? df.cache() vs sqlContext.cacheTable(tableName) 2) Do you need to warm up the cache before seeing the performance benefits? Is the cache LRU? Do you need to run some queries on the table before it is cached in memory? 3) Is caching the table much faster than .saveAsTable? I am only seeing a 10 %- 20% performance increase. -- Pedro Rodriguez UCBerkeley 2014 | Computer Science SnowGeek http://SnowGeek.org pedro-rodriguez.com ski.rodrig...@gmail.com 208-340-1703
Re: Issue with column named count in a DataFrame
Additionally have you tried enclosing count in `backticks`? On Wed, Jul 22, 2015 at 4:25 PM, Michael Armbrust mich...@databricks.com wrote: I believe this will be fixed in Spark 1.5 https://github.com/apache/spark/pull/7237 On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T matthew.t.yo...@intel.com wrote: I'm trying to do some simple counting and aggregation in an IPython notebook with Spark 1.4.0 and I have encountered behavior that looks like a bug. When I try to filter rows out of an RDD with a column name of count I get a large error message. I would just avoid naming things count, except for the fact that this is the default column name created with the count() operation in pyspark.sql.GroupedData The small example program below demonstrates the issue. from pyspark.sql import SQLContext sqlContext = SQLContext(sc) dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title]) counts = dataFrame.groupBy('title').count() counts.filter(title = 'foo').show() # Works counts.filter(count 1).show() # Errors out I can even reproduce the issue in a PySpark shell session by entering these commands. I suspect that the error has something to with Spark wanting to call the count() function in place of looking at the count column. The error message is as follows: Py4JJavaError Traceback (most recent call last) ipython-input-29-62a1b7c71f21 in module() 1 counts.filter(count 1).show() # Errors Out C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc in filter(self, condition) 774 775 if isinstance(condition, basestring): -- 776 jdf = self._jdf.filter(condition) 777 elif isinstance(condition, Column): 778 jdf = self._jdf.filter(condition._jc) C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: C:\Python27\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o229.filter. : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found count 1 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45) at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) Is there a recommended workaround to the inability to filter on a column named count? Do I have to make a new DataFrame and rename the column just to work around this bug? What's the best way to do that? Thanks, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Issue with column named count in a DataFrame
I believe this will be fixed in Spark 1.5 https://github.com/apache/spark/pull/7237 On Wed, Jul 22, 2015 at 3:04 PM, Young, Matthew T matthew.t.yo...@intel.com wrote: I'm trying to do some simple counting and aggregation in an IPython notebook with Spark 1.4.0 and I have encountered behavior that looks like a bug. When I try to filter rows out of an RDD with a column name of count I get a large error message. I would just avoid naming things count, except for the fact that this is the default column name created with the count() operation in pyspark.sql.GroupedData The small example program below demonstrates the issue. from pyspark.sql import SQLContext sqlContext = SQLContext(sc) dataFrame = sc.parallelize([(foo,), (foo,), (bar,)]).toDF([title]) counts = dataFrame.groupBy('title').count() counts.filter(title = 'foo').show() # Works counts.filter(count 1).show() # Errors out I can even reproduce the issue in a PySpark shell session by entering these commands. I suspect that the error has something to with Spark wanting to call the count() function in place of looking at the count column. The error message is as follows: Py4JJavaError Traceback (most recent call last) ipython-input-29-62a1b7c71f21 in module() 1 counts.filter(count 1).show() # Errors Out C:\Users\User\Downloads\spark-1.4.0-bin-hadoop2.6\python\pyspark\sql\dataframe.pyc in filter(self, condition) 774 775 if isinstance(condition, basestring): -- 776 jdf = self._jdf.filter(condition) 777 elif isinstance(condition, Column): 778 jdf = self._jdf.filter(condition._jc) C:\Python27\lib\site-packages\py4j\java_gateway.pyc in __call__(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, -- 538 self.target_id, self.name) 539 540 for temp_arg in temp_args: C:\Python27\lib\site-packages\py4j\protocol.pyc in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299 'An error occurred while calling {0}{1}{2}.\n'. -- 300 format(target_id, '.', name), value) 301 else: 302 raise Py4JError( Py4JJavaError: An error occurred while calling o229.filter. : java.lang.RuntimeException: [1.7] failure: ``('' expected but `' found count 1 ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.SqlParser.parseExpression(SqlParser.scala:45) at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:652) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source) at java.lang.reflect.Method.invoke(Unknown Source) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Unknown Source) Is there a recommended workaround to the inability to filter on a column named count? Do I have to make a new DataFrame and rename the column just to work around this bug? What's the best way to do that? Thanks, -- Matthew Young - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: assertion failed error with GraphX
I am also having problems with triangle count - seems like this algorithm is very memory consuming (I could not process even small graphs ~ 5 million Vertices and 70 million Edges with less the 32 GB RAM on EACH machine). What if I have graphs with billion edges, what amount of RAM do I need then? So now I am trying to understand how it works and rewrite it maybe. I would like to process big graphs with not so much RAM on each machine. Am 20.07.2015 04:27 schrieb Jack Yang j...@uow.edu.au: Hi there, I got an error when running one simple graphX program. My setting is: spark 1.4.0, Hadoop yarn 2.5. scala 2.10. with four virtual machines. if I constructed one small graph (6 nodes, 4 edges), I run: println(triangleCount: %s .format( hdfs_graph.triangleCount().vertices.count() )) that returns me the correct results. But I import a much larger graph (with 85 nodes, 500 edges), the error is 15/07/20 12:03:36 WARN scheduler.TaskSetManager: Lost task 2.0 in stage 11.0 (TID 32, 192.168.157.131): java.lang.AssertionError: assertion failed at scala.Predef$.assert(Predef.scala:165) at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:90) at org.apache.spark.graphx.lib.TriangleCount$$anonfun$7.apply(TriangleCount.scala:87) at org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:140) at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:159) at org.apache.spark.graphx.impl.VertexRDDImpl$$anonfun$3.apply(VertexRDDImpl.scala:156) at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277) at org.apache.spark.rdd.RDD.iterator(RDD.scala:244) I run the above two graphs using the same submit command: spark-submit --class sparkUI.GraphApp --master spark://master:7077 --executor-memory 2G --total-executor-cores 4 myjar.jar any thought? anything wrong with my machine or configuration? Best regards, Jack
Re: Parquet problems
Hi Anders, Did you ever get to the bottom of this issue? I'm encountering it too, but only in yarn-cluster mode running on spark 1.4.0. I was thinking of trying 1.4.1 today. Michael On Thu, Jun 25, 2015 at 5:52 AM, Anders Arpteg arp...@spotify.com wrote: Yes, both the driver and the executors. Works a little bit better with more space, but still a leak that will cause failure after a number of reads. There are about 700 different data sources that needs to be loaded, lots of data... tor 25 jun 2015 08:02 Sabarish Sasidharan sabarish.sasidha...@manthan.com skrev: Did you try increasing the perm gen for the driver? Regards Sab On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote: When reading large (and many) datasets with the Spark 1.4.0 DataFrames parquet reader (the org.apache.spark.sql.parquet format), the following exceptions are thrown: Exception in thread task-result-getter-0 Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread task-result-getter-0 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-1 java.lang.OutOfMemoryError: PermGen space Exception in thread task-result-getter-2 java.lang.OutOfMemoryError: PermGen space and many more like these from different threads. I've tried increasing the PermGen space using the -XX:MaxPermSize VM setting, but even after tripling the space, the same errors occur. I've also tried storing intermediate results, and am able to get the full job completed by running it multiple times and starting for the last successful intermediate result. There seems to be some memory leak in the parquet format. Any hints on how to fix this problem? Thanks, Anders
Re: Mesos + Spark
This page, http://spark.apache.org/docs/latest/running-on-mesos.html, covers many of these questions. If you submit a job with the option --supervise, it will be restarted if it fails. You can use Chronos for scheduling. You can create a single streaming job with a 10 minute batch interval, if that works for your every 10-min. need. Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 3:53 AM, boci boci.b...@gmail.com wrote: Hi guys! I'm a new in mesos. I have two spark application (one streaming and one batch). I want to run both app in mesos cluster. Now for testing I want to run in docker container so I started a simple redjack/mesos-master, but I think a lot of think unclear for me (both mesos and spark-mesos). If I have a mesos cluster (for testing it will be some docker container) i need a separate machine (container) to run my spark job? Or can I submit the cluster and schedule (chronos or I dunno)? How can I run the streaming job? What happened if the controller died? Or if I call spark-submit with master=mesos my application started and I can forget? How can I run in every 10 min without submit in every 10 min? How can I run my streaming app in HA mode? Thanks b0c1 -- Skype: boci13, Hangout: boci.b...@gmail.com
Re: Spark-hive parquet schema evolution
While it's not recommended to overwrite files Hive thinks it understands, you can add the column to Hive's metastore using an ALTER TABLE command using HiveQL in the Hive shell or using HiveContext.sql(): ALTER TABLE mytable ADD COLUMNS col_name data_type See https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL#LanguageManualDDL-AlterTable/Partition/Column for full details. dean Dean Wampler, Ph.D. Author: Programming Scala, 2nd Edition http://shop.oreilly.com/product/0636920033073.do (O'Reilly) Typesafe http://typesafe.com @deanwampler http://twitter.com/deanwampler http://polyglotprogramming.com On Wed, Jul 22, 2015 at 4:36 AM, Cheng Lian lian.cs@gmail.com wrote: Since Hive doesn’t support schema evolution, you’ll have to update the schema stored in metastore somehow. For example, you can create a new external table with the merged schema. Say you have a Hive table t1: CREATE TABLE t1 (c0 INT, c1 DOUBLE); By default, this table is stored in HDFS path hdfs://some-host:9000/user/hive/warehouse/t1. Now you append some Parquet data with an extra column c2 to the same directory: import org.apache.spark.sql.types._ val path = hdfs://some-host:9000/user/hive/warehouse/t1val df1 = sqlContext.range(10).select('id as 'c0, 'id cast DoubleType as 'c1, 'id cast StringType as 'c2) df1.write.mode(append).parquet(path) Now you can create a new external table t2 like this: val df2 = sqlContext.read.option( mergeSchema, true).parquet(path) df2.write.path(path).saveAsTable(t2) Since we specified a path above, the newly created t2 is an external table pointing to the original HDFS location. But the schema of t2 is the merged version. The drawback of this approach is that, t2 is actually a Spark SQL specific data source table rather than a genuine Hive table. This means, it can be accessed by Spark SQL only. We’re just using Hive metastore to help persisting metadata of the data source table. However, since you’re asking how to access the new table via Spark SQL CLI, this should work for you. We are working on making Parquet and ORC data source tables accessible via Hive in Spark 1.5.0. Cheng On 7/22/15 10:32 AM, Jerrick Hoang wrote: Hi Lian, Sorry I'm new to Spark so I did not express myself very clearly. I'm concerned about the situation when let's say I have a Parquet table some partitions and I add a new column A to parquet schema and write some data with the new schema to a new partition in the table. If i'm not mistaken, if I do a sqlContext.read.parquet(table_path).printSchema() it will print the correct schema with new column A. But if I do a 'describe table' from SparkSQLCLI I won't see the new column being added. I understand that this is because Hive doesn't support schema evolution. So what is the best way to support CLI queries in this situation? Do I need to manually alter the table everytime the underlying schema changes? Thanks On Tue, Jul 21, 2015 at 4:37 PM, Cheng Lian lian.cs@gmail.com wrote: Hey Jerrick, What do you mean by schema evolution with Hive metastore tables? Hive doesn't take schema evolution into account. Could you please give a concrete use case? Are you trying to write Parquet data with extra columns into an existing metastore Parquet table? Cheng On 7/21/15 1:04 AM, Jerrick Hoang wrote: I'm new to Spark, any ideas would be much appreciated! Thanks On Sat, Jul 18, 2015 at 11:11 AM, Jerrick Hoang jerrickho...@gmail.comjerrickho...@gmail.com wrote: Hi all, I'm aware of the support for schema evolution via DataFrame API. Just wondering what would be the best way to go about dealing with schema evolution with Hive metastore tables. So, say I create a table via SparkSQL CLI, how would I deal with Parquet schema evolution? Thanks, J
Re: Is spark suitable for real time query
My code like below: MapString, String t11opt = new HashMapString, String(); t11opt.put(url, DB_URL); t11opt.put(dbtable, t11); DataFrame t11 = sqlContext.load(jdbc, t11opt); t11.registerTempTable(t11); ...the same for t12, t21, t22 DataFrame t1 = t11.unionAll(t12); t1.registerTempTable(t1); DataFrame t2 = t21.unionAll(t22); t2.registerTempTable(t2); for (int i = 0; i 10; i ++) { System.out.println(new Date(System.currentTimeMillis())); DataFrame crossjoin = sqlContext.sql(select txt from t1 join t2 on t1.id = t2.id); crossjoin.show(); System.out.println(new Date(System.currentTimeMillis())); } Where t11,t12, t21,t22 are all table dataframe load from jdbc of mysql database which is at local with the spark job. But each loop execute about 3 seconds. i do not know why cost so many time? 2015-07-22 19:52 GMT+08:00 Robin East robin.e...@xense.co.uk: Here’s an example using spark-shell on my laptop: sc.textFile(LICENSE).filter(_ contains Spark).count This takes less than a second the first time I run it and is instantaneous on every subsequent run. What code are you running? On 22 Jul 2015, at 12:34, Louis Hust louis.h...@gmail.com wrote: I do a simple test using spark in standalone mode(not cluster), and found a simple action take a few seconds, the data size is small, just few rows. So each spark job will cost some time for init or prepare work no matter what the job is? I mean if the basic framework of spark job will cost seconds? 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.uk: Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
Re: use S3-Compatible Storage with spark
I could get a little further : - installed spark-1.4.1-without-hadoop - unpacked hadoop 2.7.1 - added the folowing in spark-env.sh HADOOP_HOME=/opt/hadoop-2.7.1/ SPARK_DIST_CLASSPATH=/opt/hadoop-2.7.1/opt/hadoop-2.7.1/share/hadoop/tools/lib/*/share/hadoop/tools/lib/*:/opt/hadoop-2.7.1/etc/hadoop:/opt/hadoop-2.7.1/share/hadoop/common/lib/*:/opt/had$ and start spark-shell with : bin/spark-shell --jars /opt/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar Now spark-shell is starting with spark.SparkContext: Added JAR file:/opt/hadoop-2.7.1/share/hadoop/tools/lib/hadoop-aws-2.7.1.jar at http://185.19.29.91:46368/jars/hadoop-aws-2.7.1.jar with timestamp 1437575186830 But when trying to access s3 I have java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated In Fact it doesn't even matters if I try to use s3n or s3a, error is the same (strange!) 2015-07-22 12:19 GMT+02:00 Thomas Demoor thomas.dem...@hgst.com: You need to get the hadoop-aws.jar from hadoop-tools (use hadoop 2.7+) - you can get the source and build with mvn or get it from prebuilt hadoop distro's. Then when you run your spark job add --jars path/to/thejar From: Schmirr Wurst schmirrwu...@gmail.com Sent: Wednesday, July 22, 2015 12:06 PM To: Thomas Demoor Subject: Re: use S3-Compatible Storage with spark Hi Thomas, thanks, could you just tell me what exaclty I need to do ? I'm not familiar with java programming - where do I get the jar from, do I need to compile it with mvn ? - where should I update the classpath and how ? 2015-07-22 11:55 GMT+02:00 Thomas Demoor thomas.dem...@hgst.com: The classes are not found. Is the jar on your classpath? Take care: there are multiple s3 connectors in hadoop: the legacy s3n, based on a 3d party S3 lib Jets3t, and the recent (functional since hadoop 2.7) s3a based on the Amazon SDK. Make sure you stick to one: so use fs.s3a endpoint and url s3a://bucket/object or fs.s3n.endpoint and s3n://bucket/object. I recommend s3a but I'm biased :P Regards, Thomas From: Schmirr Wurst schmirrwu...@gmail.com Sent: Tuesday, July 21, 2015 11:59 AM To: Akhil Das Cc: user@spark.apache.org Subject: Re: use S3-Compatible Storage with spark Which version do you have ? - I tried with spark 1.4.1 for hdp 2.6, but here I had an issue that the aws-module is not there somehow: java.io.IOException: No FileSystem for scheme: s3n the same for s3a : java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found - On Spark 1.4.1 for hdp 2.4 , the module is there, and works out of the box for S3n (but for the endpoint) But I have java.io.IOException: No FileSystem for scheme: s3a :-| 2015-07-21 11:09 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Did you try with s3a? It seems its more like an issue with hadoop. Thanks Best Regards On Tue, Jul 21, 2015 at 2:31 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: It seems to work for the credentials , but the endpoint is ignored.. : I've changed it to sc.hadoopConfiguration.set(fs.s3n.endpoint,test.com) And I continue to get my data from amazon, how could it be ? (I also use s3n in my text url) 2015-07-21 9:30 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: You can add the jar in the classpath, and you can set the property like: sc.hadoopConfiguration.set(fs.s3a.endpoint,storage.sigmoid.com) Thanks Best Regards On Mon, Jul 20, 2015 at 9:41 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Thanks, that is what I was looking for... Any Idea where I have to store and reference the corresponding hadoop-aws-2.6.0.jar ?: java.io.IOException: No FileSystem for scheme: s3n 2015-07-20 8:33 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Not in the uri, but in the hadoop configuration you can specify it. property namefs.s3a.endpoint/name descriptionAWS S3 endpoint to connect to. An up-to-date list is provided in the AWS Documentation: regions and endpoints. Without this property, the standard region (s3.amazonaws.com) is assumed. /description /property Thanks Best Regards On Sun, Jul 19, 2015 at 9:13 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: I want to use pithos, were do I can specify that endpoint, is it possible in the url ? 2015-07-19 17:22 GMT+02:00 Akhil Das ak...@sigmoidanalytics.com: Could you name the Storage service that you are using? Most of them provides a S3 like RestAPI endpoint for you to hit. Thanks Best Regards On Fri, Jul 17, 2015 at 2:06 PM, Schmirr Wurst schmirrwu...@gmail.com wrote: Hi, I wonder how to use S3 compatible Storage in Spark ? If I'm using s3n:// url schema, the it will point to
Re: Scaling spark cluster for a running application
Are you running the Spark cluster in standalone or YARN? In standalone, the application gets the available resources when it starts. With YARN, you can try to turn on the setting *spark.dynamicAllocation.enabled* See https://spark.apache.org/docs/latest/configuration.html On Wed, Jul 22, 2015 at 2:20 PM phagunbaya phagun.b...@falkonry.com wrote: I have a spark cluster running in client mode with driver outside the spark cluster. I want to scale the cluster after an application is submitted. In order to do this, I'm creating new workers and they are getting registered with master but issue I'm seeing is; running application does not use the newly added worker. Hence cannot add more resources to existing running application. Is there any other way or config to deal with this use-case ? How to make running application to ask for executors from newly issued worker node ? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Scaling-spark-cluster-for-a-running-application-tp23951.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
R: Is spark suitable for real time query
Are you using jdbc server? Paolo Inviata dal mio Windows Phone Da: Louis Hustmailto:louis.h...@gmail.com Inviato: 22/07/2015 13:47 A: Robin Eastmailto:robin.e...@xense.co.uk Cc: user@spark.apache.orgmailto:user@spark.apache.org Oggetto: Re: Is spark suitable for real time query I do a simple test using spark in standalone mode(not cluster), and found a simple action take a few seconds, the data size is small, just few rows. So each spark job will cost some time for init or prepare work no matter what the job is? I mean if the basic framework of spark job will cost seconds? 2015-07-22 19:17 GMT+08:00 Robin East robin.e...@xense.co.ukmailto:robin.e...@xense.co.uk: Real-time is, of course, relative but you’ve mentioned microsecond level. Spark is designed to process large amounts of data in a distributed fashion. No distributed system I know of could give any kind of guarantees at the microsecond level. Robin On 22 Jul 2015, at 11:14, Louis Hust louis.h...@gmail.commailto:louis.h...@gmail.com wrote: Hi, all I am using spark jar in standalone mode, fetch data from different mysql instance and do some action, but i found the time is at second level. So i want to know if spark job is suitable for real time query which at microseconds?
Applications metrics unseparatable from Master metrics?
Hi, I tried to enable Master metrics source (to get number of running/waiting applications etc), and connected it to Graphite. However, when these are enabled, application metrics are also sent. Is it possible to separate them, and send only master metrics without applications? I see that Master class is registering both: https://github.com/apache/spark/blob/b9a922e260bec1b211437f020be37fab46a85db0/core/src/main/scala/org/apache/spark/deploy/master/Master.scala#L91 Thanks, RK.