Re: Long-Running Spark application doesn't clean old shuffle data correctly
Hi Alex, Shuffle files in spark are deleted when the object holding a reference to the shuffle file on disk goes out of scope (is garbage collected by the JVM). Could it be the case that you are keeping these objects alive? Regards, Keith. http://keith-chapman.com On Sun, Jul 21, 2019 at 12:19 AM Alex Landa wrote: > Thanks, > I looked into these options, the cleaner periodic interval is set to 30 > min by default. > The block option for shuffle - > *spark.cleaner.referenceTracking.blocking.shuffle* - is set to false by > default. > What are the implications of setting it to true? > Will it make the driver slower? > > Thanks, > Alex > > On Sun, Jul 21, 2019 at 9:06 AM Prathmesh Ranaut Gmail < > prathmesh.ran...@gmail.com> wrote: > >> This is the job of ContextCleaner. There are few a property that you can >> tweak to see if that helps: >> spark.cleaner.periodicGC.interval >> spark.cleaner.referenceTracking >> spark.cleaner.referenceTracking.blocking.shuffle >> >> Regards >> Prathmesh Ranaut >> >> On Jul 21, 2019, at 11:31 AM, Alex Landa wrote: >> >> Hi, >> >> We are running a long running Spark application ( which executes lots of >> quick jobs using our scheduler ) on Spark stand-alone cluster 2.4.0. >> We see that old shuffle files ( a week old for example ) are not deleted >> during the execution of the application, which leads to out of disk space >> errors on the executor. >> If we re-deploy the application, the Spark cluster take care of the >> cleaning >> and deletes the old shuffle data (since we have >> /-Dspark.worker.cleanup.enabled=true/ in the worker config). >> I don't want to re-deploy our app every week or two, but to be able to >> configure spark to clean old shuffle data (as it should). >> >> How can I configure Spark to delete old shuffle data during the life time >> of >> the application (not after)? >> >> >> Thanks, >> Alex >> >>
Re: Sorting tuples with byte key and byte value
Hi Supun, A couple of things with regard to your question. --executor-cores means the number of worker threads per VM. According to your requirement this should be set to 8. *repartitionAndSortWithinPartitions *is a RDD operation, RDD operations in Spark are not performant both in terms of execution and memory. I would rather use Dataframe sort operation if performance is key. Regards, Keith. http://keith-chapman.com On Mon, Jul 15, 2019 at 8:45 AM Supun Kamburugamuve < supun.kamburugam...@gmail.com> wrote: > Hi all, > > We are trying to measure the sorting performance of Spark. We have a 16 > node cluster with 48 cores and 256GB of ram in each machine and 10Gbps > network. > > Let's say we are running with 128 parallel tasks and each partition > generates about 1GB of data (total 128GB). > > We are using the method *repartitionAndSortWithinPartitions* > > A standalone cluster is used with the following configuration. > > SPARK_WORKER_CORES=1 > SPARK_WORKER_MEMORY=16G > SPARK_WORKER_INSTANCES=8 > > --executor-memory 16G --executor-cores 1 --num-executors 128 > > I believe this sets 128 executors to run the job each having 16GB of > memory and spread across 16 nodes with 8 threads in each node. This > configuration runs very slow. The program doesn't use disks to read or > write data (data generated in-memory and we don't write to file after > sorting). > > It seems even though the data size is small, it uses disk for the shuffle. > We are not sure our configurations are optimal to achieve the best > performance. > > Best, > Supun.. > >
Re: Override jars in spark submit
Hi Naresh, You could use "--conf spark.driver.extraClassPath=". Note that the jar will not be shipped to the executors, if its a class that is needed on the executors as well you should provide "--conf spark.executor.extraClassPath=". Note that if you do provide executor extraclasspath the jar file needs to be present on all the executors. Regards, Keith. http://keith-chapman.com On Wed, Jun 19, 2019 at 8:57 PM naresh Goud wrote: > Hello All, > > How can we override jars in spark submit? > We have hive-exec-spark jar which is available as part of default spark > cluster jars. > We wanted to override above mentioned jar in spark submit with latest > version jar. > How do we do that ? > > > Thank you, > Naresh > -- > Thanks, > Naresh > www.linkedin.com/in/naresh-dulam > http://hadoopandspark.blogspot.com/ > >
Re: [pyspark 2.3] count followed by write on dataframe
Yes that is correct, that would cause computation twice. If you want the computation to happen only once you can cache the dataframe and call count and write on the cached dataframe. Regards, Keith. http://keith-chapman.com On Mon, May 20, 2019 at 6:43 PM Rishi Shah wrote: > Hi All, > > Just wanted to confirm my understanding around actions on dataframe. If > dataframe is not persisted at any point, & count() is called on a dataframe > followed by write action --> this would trigger dataframe computation twice > (which could be the performance hit for a larger dataframe).. Could anyone > please help confirm? > > -- > Regards, > > Rishi Shah >
RE: how to use cluster sparkSession like localSession
Hello, I think you can try with below , the reason is only yarn-cllient mode is supported for your scenario. master("yarn-client") Thanks very much. Keith From: 张万新 Sent: Thursday, November 1, 2018 11:36 PM To: 崔苗(数据与人工智能产品开发部) <0049003...@znv.com> Cc: user Subject: Re: how to use cluster sparkSession like localSession I think you should investigate apache zeppelin and livy 崔苗(数据与人工智能产品开发部) <0049003...@znv.com<mailto:0049003...@znv.com>>于2018年11月2日 周五11:01写道: Hi, we want to execute spark code with out submit application.jar,like this code: public static void main(String args[]) throws Exception{ SparkSession spark = SparkSession .builder() .master("local[*]") .appName("spark test") .getOrCreate(); Dataset testData = spark.read().csv(".\\src\\main\\java\\Resources\\no_schema_iris.scv"); testData.printSchema(); testData.show(); } the above code can work well with idea , do not need to generate jar file and submit , but if we replace master("local[*]") with master("yarn") , it can't work , so is there a way to use cluster sparkSession like local sparkSession ? we need to dynamically execute spark code in web server according to the different request , such as filter request will call dataset.filter() , so there is no application.jar to submit . [https://mail-online.nosdn.127.net/qiyelogo/defaultAvatar.png] 0049003208 0049003...@znv.com<mailto:0049003...@znv.com> 签名由 网易邮箱大师<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fmail.163.com%2Fdashi%2Fdlpro.html%3Ffrom%3Dmail81&data=02%7C01%7Caisun%40ebay.com%7C08e6e5175c0e4177e9a608d6408d94a2%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636767374160940114&sdata=HfnYgWKXOUCodtDZGPFQHpyVcY8Oi707rihUe8v24cQ%3D&reserved=0> 定制 - To unsubscribe e-mail: user-unsubscr...@spark.apache.org<mailto:user-unsubscr...@spark.apache.org>
Pyspark error when converting string to timestamp in map function
Hi all, I'm trying to create a dataframe enforcing a schema so that I can write it to a parquet file. The schema has timestamps and I get an error with pyspark. The following is a snippet of code that exhibits the problem, df = sqlctx.range(1000) schema = StructType([StructField('a', TimestampType(), True)]) df1 = sqlctx.createDataFrame(df.rdd.map(row_gen_func), schema) row_gen_func is a function that retruns timestamp strings of the form "2018-03-21 11:09:44" When I compile this with Spark 2.2 I get the following error, raise TypeError("%s can not accept object %r in type %s" % (dataType, obj, type(obj))) TypeError: TimestampType can not accept object '2018-03-21 08:06:17' in type Regards, Keith. http://keith-chapman.com
Re: GC- Yarn vs Standalone K8
Spark on EMR is configured to use CMS GC, specifically following flags, spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' Regards, Keith. http://keith-chapman.com On Mon, Jun 11, 2018 at 8:22 PM, ankit jain wrote: > Hi, > Does anybody know if Yarn uses a different Garbage Collector from Spark > standalone? > > We migrated our application recently from EMR to K8(not using native spark > on k8 yet) and see quite a bit of performance degradation. > > Diving further it seems garbage collection is running too often, up-to 50% > of task time even with small amount of data - PFA Spark UI screenshot. > > I have updated GC to G1GC and it has helped a bit - GC time have come down > from 50-30%, still too high though. > > Also enabled -verbose:gc, so will be much more metrics to play with but > any pointers meanwhile will be appreciated. > > > -- > Thanks & Regards, > Ankit. > > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org >
Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder
Hi Michael, sorry for the late reply. I guess you may have to set it through the hdfs core-site.xml file. The property you need to set is "hadoop.tmp.dir" which defaults to "/tmp/hadoop-${user.name}" Regards, Keith. http://keith-chapman.com On Mon, Mar 19, 2018 at 1:05 PM, Michael Shtelma wrote: > Hi Keith, > > Thank you for the idea! > I have tried it, so now the executor command is looking in the following > way : > > /bin/bash -c /usr/java/latest//bin/java -server -Xmx51200m > '-Djava.io.tmpdir=my_prefered_path' > -Djava.io.tmpdir=/tmp/hadoop-msh/nm-local-dir/usercache/ > msh/appcache/application_1521110306769_0041/container_ > 1521110306769_0041_01_04/tmp > > JVM is using the second Djava.io.tmpdir parameter and writing > everything to the same directory as before. > > Best, > Michael > Sincerely, > Michael Shtelma > > > On Mon, Mar 19, 2018 at 6:38 PM, Keith Chapman > wrote: > > Can you try setting spark.executor.extraJavaOptions to have > > -Djava.io.tmpdir=someValue > > > > Regards, > > Keith. > > > > http://keith-chapman.com > > > > On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma > > wrote: > >> > >> Hi Keith, > >> > >> Thank you for your answer! > >> I have done this, and it is working for spark driver. > >> I would like to make something like this for the executors as well, so > >> that the setting will be used on all the nodes, where I have executors > >> running. > >> > >> Best, > >> Michael > >> > >> > >> On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman > > >> wrote: > >> > Hi Michael, > >> > > >> > You could either set spark.local.dir through spark conf or > >> > java.io.tmpdir > >> > system property. > >> > > >> > Regards, > >> > Keith. > >> > > >> > http://keith-chapman.com > >> > > >> > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma > >> > wrote: > >> >> > >> >> Hi everybody, > >> >> > >> >> I am running spark job on yarn, and my problem is that the blockmgr-* > >> >> folders are being created under > >> >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/* > >> >> The size of this folder can grow to a significant size and does not > >> >> really fit into /tmp file system for one job, which makes a real > >> >> problem for my installation. > >> >> I have redefined hadoop.tmp.dir in core-site.xml and > >> >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other > >> >> location and expected that the block manager will create the files > >> >> there and not under /tmp, but this is not the case. The files are > >> >> created under /tmp. > >> >> > >> >> I am wondering if there is a way to make spark not use /tmp at all > and > >> >> configure it to create all the files somewhere else ? > >> >> > >> >> Any assistance would be greatly appreciated! > >> >> > >> >> Best, > >> >> Michael > >> >> > >> >> > - > >> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >> >> > >> > > > > > >
Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder
Can you try setting spark.executor.extraJavaOptions to have -D java.io.tmpdir=someValue Regards, Keith. http://keith-chapman.com On Mon, Mar 19, 2018 at 10:29 AM, Michael Shtelma wrote: > Hi Keith, > > Thank you for your answer! > I have done this, and it is working for spark driver. > I would like to make something like this for the executors as well, so > that the setting will be used on all the nodes, where I have executors > running. > > Best, > Michael > > > On Mon, Mar 19, 2018 at 6:07 PM, Keith Chapman > wrote: > > Hi Michael, > > > > You could either set spark.local.dir through spark conf or java.io.tmpdir > > system property. > > > > Regards, > > Keith. > > > > http://keith-chapman.com > > > > On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma > wrote: > >> > >> Hi everybody, > >> > >> I am running spark job on yarn, and my problem is that the blockmgr-* > >> folders are being created under > >> /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/* > >> The size of this folder can grow to a significant size and does not > >> really fit into /tmp file system for one job, which makes a real > >> problem for my installation. > >> I have redefined hadoop.tmp.dir in core-site.xml and > >> yarn.nodemanager.local-dirs in yarn-site.xml pointing to other > >> location and expected that the block manager will create the files > >> there and not under /tmp, but this is not the case. The files are > >> created under /tmp. > >> > >> I am wondering if there is a way to make spark not use /tmp at all and > >> configure it to create all the files somewhere else ? > >> > >> Any assistance would be greatly appreciated! > >> > >> Best, > >> Michael > >> > >> - > >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >> > > >
Re: Running out of space on /tmp file system while running spark job on yarn because of size of blockmgr folder
Hi Michael, You could either set spark.local.dir through spark conf or java.io.tmpdir system property. Regards, Keith. http://keith-chapman.com On Mon, Mar 19, 2018 at 9:59 AM, Michael Shtelma wrote: > Hi everybody, > > I am running spark job on yarn, and my problem is that the blockmgr-* > folders are being created under > /tmp/hadoop-msh/nm-local-dir/usercache/msh/appcache/application_id/* > The size of this folder can grow to a significant size and does not > really fit into /tmp file system for one job, which makes a real > problem for my installation. > I have redefined hadoop.tmp.dir in core-site.xml and > yarn.nodemanager.local-dirs in yarn-site.xml pointing to other > location and expected that the block manager will create the files > there and not under /tmp, but this is not the case. The files are > created under /tmp. > > I am wondering if there is a way to make spark not use /tmp at all and > configure it to create all the files somewhere else ? > > Any assistance would be greatly appreciated! > > Best, > Michael > > - > To unsubscribe e-mail: user-unsubscr...@spark.apache.org > >
Can I get my custom spark strategy to run last?
Hi, I'd like to write a custom Spark strategy that runs after all the existing Spark strategies are run. Looking through the Spark code it seems like the custom strategies are prepended to the list of strategies in Spark. Is there a way I could get it to run last? Regards, Keith. http://keith-chapman.com
Re: Spark not releasing shuffle files in time (with very large heap)
My issue is that there is not enough pressure on GC, hence GC is not kicking in fast enough to delete the shuffle files of previous iterations. Regards, Keith. http://keith-chapman.com On Thu, Feb 22, 2018 at 6:58 PM, naresh Goud wrote: > It would be very difficult to tell without knowing what is your > application code doing, what kind of transformation/actions performing. > From my previous experience tuning application code which avoids > unnecessary objects reduce pressure on GC. > > > On Thu, Feb 22, 2018 at 2:13 AM, Keith Chapman > wrote: > >> Hi, >> >> I'm benchmarking a spark application by running it for multiple >> iterations, its a benchmark thats heavy on shuffle and I run it on a local >> machine with a very large hear (~200GB). The system has a SSD. When running >> for 3 to 4 iterations I get into a situation that I run out of disk space >> on the /tmp directory. On further investigation I was able to figure out >> that the reason for this is that the shuffle files are still around, >> because I have a very large hear GC has not happen and hence the shuffle >> files are not deleted. I was able to confirm this by lowering the heap size >> and I see GC kicking in more often and the size of /tmp stays under >> control. Is there any way I could configure spark to handle this issue? >> >> One option that I have is to have GC run more often by >> setting spark.cleaner.periodicGC.interval to a much lower value. Is >> there a cleaner solution? >> >> Regards, >> Keith. >> >> http://keith-chapman.com >> > >
Spark not releasing shuffle files in time (with very large heap)
Hi, I'm benchmarking a spark application by running it for multiple iterations, its a benchmark thats heavy on shuffle and I run it on a local machine with a very large hear (~200GB). The system has a SSD. When running for 3 to 4 iterations I get into a situation that I run out of disk space on the /tmp directory. On further investigation I was able to figure out that the reason for this is that the shuffle files are still around, because I have a very large hear GC has not happen and hence the shuffle files are not deleted. I was able to confirm this by lowering the heap size and I see GC kicking in more often and the size of /tmp stays under control. Is there any way I could configure spark to handle this issue? One option that I have is to have GC run more often by setting spark.cleaner.periodicGC.interval to a much lower value. Is there a cleaner solution? Regards, Keith. http://keith-chapman.com
Re: update LD_LIBRARY_PATH when running apache job in a YARN cluster
Hi Manuel, You could use the following to add a path to the library search path, --conf spark.driver.extraLibraryPath=PathToLibFolder --conf spark.executor.extraLibraryPath=PathToLibFolder Thanks, Keith. Regards, Keith. http://keith-chapman.com On Wed, Jan 17, 2018 at 5:39 PM, Manuel Sopena Ballesteros < manuel...@garvan.org.au> wrote: > Dear Spark community, > > > > I have a spark running in a yarn cluster and I am getting some error when > trying to run my python application. > > > > /home/mansop/virtenv/bin/python2.7: error while loading shared libraries: > libpython2.7.so.1.0: cannot open shared object file: No such file or > directory > > > > Is there a way to specify the LD_LIBRARY_PATH in the spark-submit command > or in the config file? > > > > > > *Manuel Sopena Ballesteros *| Big data Engineer > *Garvan Institute of Medical Research * > The Kinghorn Cancer Centre, 370 Victoria Street, Darlinghurst, NSW 2010 > <https://maps.google.com/?q=370+Victoria+Street,+Darlinghurst,+NSW+2010&entry=gmail&source=g> > *T:* + 61 (0)2 9355 5760 <+61%202%209355%205760> | *F:* +61 (0)2 9295 8507 > <+61%202%209295%208507> | *E:* manuel...@garvan.org.au > > > NOTICE > Please consider the environment before printing this email. This message > and any attachments are intended for the addressee named and may contain > legally privileged/confidential/copyright information. If you are not the > intended recipient, you should not read, use, disclose, copy or distribute > this communication. If you have received this message in error please > notify us at once by return email and then delete both messages. We accept > no liability for the distribution of viruses or similar in electronic > communications. This notice should not be removed. >
How to find the temporary views' DDL
Hello, Is there a way to find the DDL of the “temporary” view created in current session with spark sql: For example : create or replace temporary view tmp_v as select c1 from table table_x; “Show create table “ does not work for this case as it is not a table . “Describe” could show the columns while not the ddl. Thanks very much. Keith From: Anastasios Zouzias [mailto:zouz...@gmail.com] Sent: Sunday, October 1, 2017 3:05 PM To: Kanagha Kumar Cc: user @spark Subject: Re: Error - Spark reading from HDFS via dataframes - Java Hi, Set the inferschema option to true in spark-csv. you may also want to set the mode option. See readme below https://github.com/databricks/spark-csv/blob/master/README.md<https://na01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fdatabricks%2Fspark-csv%2Fblob%2Fmaster%2FREADME.md&data=02%7C01%7Caisun%40ebay.com%7C06202a50b81d4fc9b8bb08d5089ad6cf%7C46326bff992841a0baca17c16c94ea99%7C0%7C0%7C636424383471862502&sdata=7ksnhv3SpxslH6w%2BauGRw9qnDmu7aWg8tagUwWdFBd8%3D&reserved=0> Best, Anastasios Am 01.10.2017 07:58 schrieb "Kanagha Kumar" mailto:kpra...@salesforce.com>>: Hi, I'm trying to read data from HDFS in spark as dataframes. Printing the schema, I see all columns are being read as strings. I'm converting it to RDDs and creating another dataframe by passing in the correct schema ( how the rows should be interpreted finally). I'm getting the following error: Caused by: java.lang.RuntimeException: java.lang.String is not a valid external type for schema of bigint Spark read API: Dataset hdfs_dataset = new SQLContext(spark).read().option("header", "false").csv("hdfs:/inputpath/*"); Dataset ds = new SQLContext(spark).createDataFrame(hdfs_dataset.toJavaRDD(), conversionSchema); This is the schema to be converted to: StructType(StructField(COL1,StringType,true), StructField(COL2,StringType,true), StructField(COL3,LongType,true), StructField(COL4,StringType,true), StructField(COL5,StringType,true), StructField(COL6,LongType,true)) This is the original schema obtained once read API was invoked StructType(StructField(_c1,StringType,true), StructField(_c2,StringType,true), StructField(_c3,StringType,true), StructField(_c4,StringType,true), StructField(_c5,StringType,true), StructField(_c6,StringType,true)) My interpretation is even when a JavaRDD is cast to dataframe by passing in the new schema, values are not getting type casted. This is occurring because the above read API reads data as string types from HDFS. How can I convert an RDD to dataframe by passing in the correct schema once it is read? How can the values by type cast correctly during this RDD to dataframe conversion? Or how can I read data from HDFS with an input schema in java? Any suggestions are helpful. Thanks!
RE: A bug in spark or hadoop RPC with kerberos authentication?
Thanks for the reply, I filled an issue in JIRA https://issues.apache.org/jira/browse/SPARK-21819 I submitted the job from Java API, not by the spark-submit command line as we want to make spark processing as a service . Configuration hc = new Configuration(false); String yarnxml=String.format("%s/%s", ConfigLocation,"yarn-site.xml"); String corexml=String.format("%s/%s", ConfigLocation,"core-site.xml"); String hdfsxml=String.format("%s/%s", ConfigLocation,"hdfs-site.xml"); String hivexml=String.format("%s/%s", ConfigLocation,"hive-site.xml"); hc.addResource(yarnxml); hc.addResource(corexml); hc.addResource(hdfsxml); hc.addResource(hivexml); //manually set all the Hadoop config in sparkconf SparkConf sc = new SparkConf(true); hc.forEach(entry-> { if(entry.getKey().startsWith("hive")) { sc.set(entry.getKey(), entry.getValue()); }else { sc.set("spark.hadoop."+entry.getKey(), entry.getValue()); } }); UserGroupInformation.setConfiguration(hc); UserGroupInformation.loginUserFromKeytab(Principal, Keytab); SparkSession sparkSessesion= SparkSession .builder() .master("yarn-client") //"yarn-client", "local" .config(sc) .appName(SparkEAZDebug.class.getName()) .enableHiveSupport() .getOrCreate(); Thanks very much. Keith From: 周康 [mailto:zhoukang199...@gmail.com] Sent: 2017年8月22日 20:22 To: Sun, Keith Cc: user@spark.apache.org Subject: Re: A bug in spark or hadoop RPC with kerberos authentication? you can checkout Hadoop**credential class in spark yarn。During spark submit,it will use config on the classpath. I wonder how do you reference your own config?
RE: A bug in spark or hadoop RPC with kerberos authentication?
Finally find the root cause and raise a bug issue in https://issues.apache.org/jira/browse/SPARK-21819 Thanks very much. Keith From: Sun, Keith Sent: 2017年8月22日 8:48 To: user@spark.apache.org Subject: A bug in spark or hadoop RPC with kerberos authentication? Hello , I met this very weird issue, while easy to reproduce, and stuck me for more than 1 day .I suspect this may be an issue/bug related to the class loader. Can you help confirm the root cause ? I want to specify a customized Hadoop configuration set instead of those on the class path(we have a few hadoop clusters and all have Kerberos security and I want to support different configuration). Code/error like below. The work around I found is to place a core-site.xml on the class path with below 2 properties will work. By checking the rpc code under org.apache.hadoop.ipc.RPC, I suspect the RPC code may not see the UGI class in the same classloader. So UGI is initialized with default value on the classpth which is simple authentication. core-site.xml with the security setup on the classpath: hadoop.security.authentication kerberos hadoop.security.authorization true error-- 2673 [main] DEBUG org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil - DataTransferProtocol using SaslPropertiesResolver, configured QOP dfs.data.transfer.protection = privacy, configured class dfs.data.transfer.saslproperties.resolver.class = class org.apache.hadoop.security.WhitelistBasedResolver 2696 [main] DEBUG org.apache.hadoop.service.AbstractService - Service: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED 2744 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:x@xxxCOM (auth:KERBEROS) from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:136) // 2746 [main] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC - Creating YarnRPC for org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC 2746 [main] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC - Creating a HadoopYarnProtoRpc proxy for protocol interface org.apache.hadoop.yarn.api.ApplicationClientProtocol 2801 [main] DEBUG org.apache.hadoop.ipc.Client - getting client out of cache: org.apache.hadoop.ipc.Client@748fe51d<mailto:org.apache.hadoop.ipc.Client@748fe51d> 2981 [main] DEBUG org.apache.hadoop.service.AbstractService - Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started 3004 [main] DEBUG org.apache.hadoop.ipc.Client - The ping interval is 6 ms. 3005 [main] DEBUG org.apache.hadoop.ipc.Client - Connecting to yarn-rm-1/x:8032 3019 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx] DEBUG org.apache.hadoop.ipc.Client - IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx: starting, having connections 1 3020 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx sending #0 3025 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx] DEBUG org.apache.hadoop.ipc.Client - IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx got value #-1 3026 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx] DEBUG org.apache.hadoop.ipc.Client - closing ipc connection to yarn-rm-1/x:8032: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1131) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979) ---code--- Configuration hc = new Configuration(false); hc.addResource("myconf /yarn-site.xml"); hc.addResource("myconf/core-site.xml"); hc.addResource("myconf/hdfs-site.xml"); hc.addResource("myconf/hive-site.xml"); SparkConf sc = new SparkConf(true); // add config in spark conf as no xml in the classpath except those “default.xml” from Hadoop jars. hc.forEach(entry-> { if(entry.getKey().startsWith("hive")) { sc.set(entry.getKey(), entry.getValue()); }else { sc.set("spark.hadoop."+entry.getKey(), entry.getValue()); } }); UserGroupInformation.setConfiguration(hc); UserGroupInformation.loginUserFromKeytab(Principal, Keytab); System.out.println("spark-conf##"); System.out.println(sc.toDebugString()); SparkSession sparkSessesion= SparkSession .builder() .master("yarn-client") //"
A bug in spark or hadoop RPC with kerberos authentication?
Hello , I met this very weird issue, while easy to reproduce, and stuck me for more than 1 day .I suspect this may be an issue/bug related to the class loader. Can you help confirm the root cause ? I want to specify a customized Hadoop configuration set instead of those on the class path(we have a few hadoop clusters and all have Kerberos security and I want to support different configuration). Code/error like below. The work around I found is to place a core-site.xml on the class path with below 2 properties will work. By checking the rpc code under org.apache.hadoop.ipc.RPC, I suspect the RPC code may not see the UGI class in the same classloader. So UGI is initialized with default value on the classpth which is simple authentication. core-site.xml with the security setup on the classpath: hadoop.security.authentication kerberos hadoop.security.authorization true error-- 2673 [main] DEBUG org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil - DataTransferProtocol using SaslPropertiesResolver, configured QOP dfs.data.transfer.protection = privacy, configured class dfs.data.transfer.saslproperties.resolver.class = class org.apache.hadoop.security.WhitelistBasedResolver 2696 [main] DEBUG org.apache.hadoop.service.AbstractService - Service: org.apache.hadoop.yarn.client.api.impl.YarnClientImpl entered state INITED 2744 [main] DEBUG org.apache.hadoop.security.UserGroupInformation - PrivilegedAction as:x@xxxCOM (auth:KERBEROS) from:org.apache.hadoop.yarn.client.RMProxy.getProxy(RMProxy.java:136) // 2746 [main] DEBUG org.apache.hadoop.yarn.ipc.YarnRPC - Creating YarnRPC for org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC 2746 [main] DEBUG org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC - Creating a HadoopYarnProtoRpc proxy for protocol interface org.apache.hadoop.yarn.api.ApplicationClientProtocol 2801 [main] DEBUG org.apache.hadoop.ipc.Client - getting client out of cache: org.apache.hadoop.ipc.Client@748fe51d 2981 [main] DEBUG org.apache.hadoop.service.AbstractService - Service org.apache.hadoop.yarn.client.api.impl.YarnClientImpl is started 3004 [main] DEBUG org.apache.hadoop.ipc.Client - The ping interval is 6 ms. 3005 [main] DEBUG org.apache.hadoop.ipc.Client - Connecting to yarn-rm-1/x:8032 3019 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx] DEBUG org.apache.hadoop.ipc.Client - IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx: starting, having connections 1 3020 [IPC Parameter Sending Thread #0] DEBUG org.apache.hadoop.ipc.Client - IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx sending #0 3025 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx] DEBUG org.apache.hadoop.ipc.Client - IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx got value #-1 3026 [IPC Client (2012095985) connection to yarn-rm-1/x:8032 from x@xx] DEBUG org.apache.hadoop.ipc.Client - closing ipc connection to yarn-rm-1/x:8032: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.AccessControlException): SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS] at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1131) at org.apache.hadoop.ipc.Client$Connection.run(Client.java:979) ---code--- Configuration hc = new Configuration(false); hc.addResource("myconf /yarn-site.xml"); hc.addResource("myconf/core-site.xml"); hc.addResource("myconf/hdfs-site.xml"); hc.addResource("myconf/hive-site.xml"); SparkConf sc = new SparkConf(true); // add config in spark conf as no xml in the classpath except those “default.xml” from Hadoop jars. hc.forEach(entry-> { if(entry.getKey().startsWith("hive")) { sc.set(entry.getKey(), entry.getValue()); }else { sc.set("spark.hadoop."+entry.getKey(), entry.getValue()); } }); UserGroupInformation.setConfiguration(hc); UserGroupInformation.loginUserFromKeytab(Principal, Keytab); System.out.println("spark-conf##"); System.out.println(sc.toDebugString()); SparkSession sparkSessesion= SparkSession .builder() .master("yarn-client") //"yarn-client", "local" .config(sc) .appName(SparkEAZDebug.class.getName()) .enableHiveSupport() .getOrCreate(); Thanks very much. Keith
Re: What are some disadvantages of issuing a raw sql query to spark?
Here is an example of a window lead function, select *, lead(someColumn1) over ( partition by someColumn2 order by someColumn13 asc nulls first) as someName from someTable Regards, Keith. http://keith-chapman.com On Tue, Jul 25, 2017 at 9:15 AM, kant kodali wrote: > How do I Specify windowInterval and slideInteval using raw sql string? > > On Tue, Jul 25, 2017 at 8:52 AM, Keith Chapman > wrote: > >> You could issue a raw sql query to spark, there is no particular >> advantage or disadvantage of doing so. Spark would build a logical plan >> from the raw sql (or DSL) and optimize on that. Ideally you would end up >> with the same physical plan, irrespective of it been written in raw sql / >> DSL. >> >> Regards, >> Keith. >> >> http://keith-chapman.com >> >> On Tue, Jul 25, 2017 at 12:50 AM, kant kodali wrote: >> >>> HI All, >>> >>> I just want to run some spark structured streaming Job similar to this >>> >>> DS.filter(col("name").equalTo("john")) >>> .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 >>> hours"), df1.col("hourlyPay")) >>> .agg(sum("hourlyPay").as("total")); >>> >>> >>> I am wondering if I can express the above query in raw sql string? >>> >>> If so how would that look like and what are some of the disadvantages of >>> using raw sql query vs spark DSL? >>> >>> >>> Thanks! >>> >>> >> >
Re: What are some disadvantages of issuing a raw sql query to spark?
You could issue a raw sql query to spark, there is no particular advantage or disadvantage of doing so. Spark would build a logical plan from the raw sql (or DSL) and optimize on that. Ideally you would end up with the same physical plan, irrespective of it been written in raw sql / DSL. Regards, Keith. http://keith-chapman.com On Tue, Jul 25, 2017 at 12:50 AM, kant kodali wrote: > HI All, > > I just want to run some spark structured streaming Job similar to this > > DS.filter(col("name").equalTo("john")) > .groupBy(functions.window(df1.col("TIMESTAMP"), "24 hours", "24 > hours"), df1.col("hourlyPay")) > .agg(sum("hourlyPay").as("total")); > > > I am wondering if I can express the above query in raw sql string? > > If so how would that look like and what are some of the disadvantages of > using raw sql query vs spark DSL? > > > Thanks! > >
Re: Get full RDD lineage for a spark job
You could also enable it with --conf spark.logLineage=true if you do not want to change any code. Regards, Keith. http://keith-chapman.com On Fri, Jul 21, 2017 at 7:57 PM, Keith Chapman wrote: > Hi Ron, > > You can try using the toDebugString method on the RDD, this will print > the RDD lineage. > > Regards, > Keith. > > http://keith-chapman.com > > On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez < > zlgonza...@yahoo.com.invalid> wrote: > >> Hi, >> Can someone point me to a test case or share sample code that is able >> to extract the RDD graph from a Spark job anywhere during its lifecycle? I >> understand that Spark has UI that can show the graph of the execution so >> I'm hoping that is using some API somewhere that I could use. >> I know RDD is the actual execution graph, so if there is also a more >> logical abstraction API closer to calls like map, filter, aggregate, etc., >> that would even be better. >> Appreciate any help... >> >> Thanks, >> Ron >> > >
Re: Get full RDD lineage for a spark job
Hi Ron, You can try using the toDebugString method on the RDD, this will print the RDD lineage. Regards, Keith. http://keith-chapman.com On Fri, Jul 21, 2017 at 11:24 AM, Ron Gonzalez wrote: > Hi, > Can someone point me to a test case or share sample code that is able to > extract the RDD graph from a Spark job anywhere during its lifecycle? I > understand that Spark has UI that can show the graph of the execution so > I'm hoping that is using some API somewhere that I could use. > I know RDD is the actual execution graph, so if there is also a more > logical abstraction API closer to calls like map, filter, aggregate, etc., > that would even be better. > Appreciate any help... > > Thanks, > Ron >
Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?
Hi Nguyen, This looks promising and seems like I could achieve it using cluster by. Thanks for the pointer. Regards, Keith. http://keith-chapman.com On Sat, Jun 24, 2017 at 5:27 AM, nguyen duc Tuan wrote: > Hi Chapman, > You can use "cluster by" to do what you want. > https://deepsense.io/optimize-spark-with-distribute-by-and-cluster-by/ > > 2017-06-24 17:48 GMT+07:00 Saliya Ekanayake : > >> I haven't worked with datasets but would this help >> https://stackoverflow.com/questions/37513667/how-to-cre >> ate-a-spark-dataset-from-an-rdd? >> >> On Jun 23, 2017 5:43 PM, "Keith Chapman" wrote: >> >>> Hi, >>> >>> I have code that does the following using RDDs, >>> >>> val outputPartitionCount = 300 >>> val part = new MyOwnPartitioner(outputPartitionCount) >>> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part) >>> >>> where myRdd is correctly formed as key, value pairs. I am looking >>> convert this to use Dataset/Dataframe instead of RDDs, so my question is: >>> >>> Is there custom partitioning of Dataset/Dataframe implemented in Spark? >>> Can I accomplish the partial sort using mapPartitions on the resulting >>> partitioned Dataset/Dataframe? >>> >>> Any thoughts? >>> >>> Regards, >>> Keith. >>> >>> http://keith-chapman.com >>> >> >
Re: Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?
Thanks for the pointer Saliya, I'm looking got an equivalent api in dataset/dataframe for repartitionAndSortWithinPartitions, I've already converted most of the RDD's to Dataframes. Regards, Keith. http://keith-chapman.com On Sat, Jun 24, 2017 at 3:48 AM, Saliya Ekanayake wrote: > I haven't worked with datasets but would this help https://stackoverflow. > com/questions/37513667/how-to-create-a-spark-dataset-from-an-rdd? > > On Jun 23, 2017 5:43 PM, "Keith Chapman" wrote: > >> Hi, >> >> I have code that does the following using RDDs, >> >> val outputPartitionCount = 300 >> val part = new MyOwnPartitioner(outputPartitionCount) >> val finalRdd = myRdd.repartitionAndSortWithinPartitions(part) >> >> where myRdd is correctly formed as key, value pairs. I am looking convert >> this to use Dataset/Dataframe instead of RDDs, so my question is: >> >> Is there custom partitioning of Dataset/Dataframe implemented in Spark? >> Can I accomplish the partial sort using mapPartitions on the resulting >> partitioned Dataset/Dataframe? >> >> Any thoughts? >> >> Regards, >> Keith. >> >> http://keith-chapman.com >> >
Is there an api in Dataset/Dataframe that does repartitionAndSortWithinPartitions?
Hi, I have code that does the following using RDDs, val outputPartitionCount = 300 val part = new MyOwnPartitioner(outputPartitionCount) val finalRdd = myRdd.repartitionAndSortWithinPartitions(part) where myRdd is correctly formed as key, value pairs. I am looking convert this to use Dataset/Dataframe instead of RDDs, so my question is: Is there custom partitioning of Dataset/Dataframe implemented in Spark? Can I accomplish the partial sort using mapPartitions on the resulting partitioned Dataset/Dataframe? Any thoughts? Regards, Keith. http://keith-chapman.com
Re: Alternatives for dataframe collectAsList()
As Paul said it really depends on what you want to do with your data, perhaps writing it to a file would be a better option, but again it depends on what you want to do with the data you collect. Regards, Keith. http://keith-chapman.com On Tue, Apr 4, 2017 at 7:38 AM, Eike von Seggern wrote: > Hi, > > depending on what you're trying to achieve `RDD.toLocalIterator()` might > help you. > > Best > > Eike > > > 2017-03-29 21:00 GMT+02:00 szep.laszlo.it : > >> Hi, >> >> after I created a dataset >> >> Dataset df = sqlContext.sql("query"); >> >> I need to have a result values and I call a method: collectAsList() >> >> List list = df.collectAsList(); >> >> But it's very slow, if I work with large datasets (20-30 million >> records). I >> know, that the result isn't presented in driver app, that's why it takes >> long time, because collectAsList() collect all data from worker nodes. >> >> But then what is the right way to get result values? Is there an other >> solution to iterate over a result dataset rows, or get values? Can anyone >> post a small & working example? >> >> Thanks & Regards, >> Laszlo Szep >> >
Re: Having issues reading a csv file into a DataSet using Spark 2.1
Thanks for the advice Diego, that was very helpful. How could I read the csv as a dataset though? I need to do a map operation over the dataset, I just coded up an example to illustrate the issue On Mar 22, 2017 6:43 PM, "Diego Fanesi" wrote: > You are using spark as a library but it is much more than that. The book > "learning Spark" is very well done and it helped me a lot starting with > spark. Maybe you should start from there. > > Those are the issues in your code: > > Basically, you generally don't execute spark code like that. You could but > it is not officially supported and many functions don't work in that way. > You should start your local cluster made of master and single worker, then > make a jar with your code and use spark-submit to send it to the cluster. > > You generally never use args because spark is a multiprocess, multi-thread > application so args will not be available everywhere. > > All contexts have been merged into the same context in the last versions > of spark. so you will need to do something like this: > > import org.apache.spark.sql.{DataFrame, SparkSession} > > object DatasetTest{ > > val spark: SparkSession = SparkSession > .builder() .master("local[8]") > .appName("Spark basic example").getOrCreate() > > import spark.implicits._ > > def main(Args: Array[String]) { > > var x = spark.read.format("csv").load("/home/user/data.csv") > > x.show() > > } > > } > > > hope this helps. > > Diego > > On 22 Mar 2017 7:18 pm, "Keith Chapman" wrote: > > Hi, > > I'm trying to read in a CSV file into a Dataset but keep having > compilation issues. I'm using spark 2.1 and the following is a small > program that exhibit the issue I'm having. I've searched around but not > found a solution that worked, I've added "import sqlContext.implicits._" as > suggested but no luck. What am I missing? Would appreciate some advice. > > import org.apache.spark.sql.functions._ > import org.apache.spark.{SparkContext, SparkConf} > import org.apache.spark.sql.{Encoder,Encoders} > > object DatasetTest{ > > def main(args: Array[String]) { > val sparkConf = new SparkConf().setAppName("DatasetTest") > val sc = new SparkContext(sparkConf) > case class Foo(text: String) > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > import sqlContext.implicits._ > val ds : org.apache.spark.sql.Dataset[Foo] = > sqlContext.read.csv(args(1)).as[Foo] > ds.show > } > } > > Compiling the above program gives, I'd expect it to work as its a simple > case class, changing it to as[String] works, but I would like to get the > case class to work. > > [error] /home/keith/dataset/DataSetTest.scala:13: Unable to find encoder > for type stored in a Dataset. Primitive types (Int, String, etc) and > Product types (case classes) are supported by importing spark.implicits._ > Support for serializing other types will be added in future releases. > [error] val ds : org.apache.spark.sql.Dataset[Foo] = > sqlContext.read.csv(args(1)).as[Foo] > > > Regards, > Keith. > > >
Having issues reading a csv file into a DataSet using Spark 2.1
Hi, I'm trying to read in a CSV file into a Dataset but keep having compilation issues. I'm using spark 2.1 and the following is a small program that exhibit the issue I'm having. I've searched around but not found a solution that worked, I've added "import sqlContext.implicits._" as suggested but no luck. What am I missing? Would appreciate some advice. import org.apache.spark.sql.functions._ import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{Encoder,Encoders} object DatasetTest{ def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("DatasetTest") val sc = new SparkContext(sparkConf) case class Foo(text: String) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext.implicits._ val ds : org.apache.spark.sql.Dataset[Foo] = sqlContext.read.csv(args(1)).as[Foo] ds.show } } Compiling the above program gives, I'd expect it to work as its a simple case class, changing it to as[String] works, but I would like to get the case class to work. [error] /home/keith/dataset/DataSetTest.scala:13: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. [error] val ds : org.apache.spark.sql.Dataset[Foo] = sqlContext.read.csv(args(1)).as[Foo] Regards, Keith.
Re:
Hi Jacek, I've looked at SparkListener and tried it, I see it getting fired on the master but I don't see it getting fired on the workers in a cluster. Regards, Keith. http://keith-chapman.com On Fri, Jan 20, 2017 at 11:09 AM, Jacek Laskowski wrote: > Hi, > > (redirecting to users as it has nothing to do with Spark project > development) > > Monitor jobs and stages using SparkListener and submit cleanup jobs where > a condition holds. > > Jacek > > On 20 Jan 2017 3:57 a.m., "Keith Chapman" wrote: > >> Hi , >> >> Is it possible for an executor (or slave) to know when an actual job >> ends? I'm running spark on a cluster (with yarn) and my workers create some >> temporary files that I would like to clean up once the job ends. Is there a >> way for the worker to detect that a job has finished? I tried doing it in >> the JobProgressListener but it does not seem to work in a cluster. The >> event is not triggered in the worker. >> >> Regards, >> Keith. >> >> http://keith-chapman.com >> >
Library dependencies in Spark
I recently wrote a blog post[1] sharing my experiences with using Apache Spark to load data into Apache Fluo. One of the things I cover in this blog post is late binding of dependencies and exclusion of provided dependencies when building a shaded jar. When writing the post, I was unsure about dependency isolation and convergence expectations in the Spark env. Does Spark support any form of dependency isolation for user code? For example can the Spark framework use Guava ver X while user code uses Guava version Y? This is assuming the user packaged Guava version Y in their shaded jar. Or, are Spark users expected to converge their user dependency versions with those used by Spark? For example, the user is expected to converge their code to use Guava version X which is used by the Spark framework. [1]: http://fluo.apache.org/blog/2016/12/22/spark-load/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Long-running job OOMs driver process
Thanks for the input. I had read somewhere that s3:// was the way to go due to some recent changes, but apparently that was outdated. I’m working on creating some dummy data and a script to process it right now. I’ll post here with code and logs when I can successfully reproduce the issue on non-production data. Yong, that's a good point about the web content. I had forgotten to mention that when I first saw this a few months ago, on another project, I could sometimes trigger the OOM by trying to view the web ui for the job. That's another case I'll try to reproduce. Thanks again! Keith. On Fri, Nov 18, 2016 at 10:30 AM Yong Zhang wrote: > Just wondering, is it possible the memory usage keeping going up due to > the web UI content? > > > Yong > > > -- > *From:* Alexis Seigneurin > *Sent:* Friday, November 18, 2016 10:17 AM > *To:* Nathan Lande > *Cc:* Keith Bourgoin; Irina Truong; u...@spark.incubator.apache.org > *Subject:* Re: Long-running job OOMs driver process > > +1 for using S3A. > > It would also depend on what format you're using. I agree with Steve that > Parquet, for instance, is a good option. If you're using plain text files, > some people use GZ files but they cannot be partitioned, thus putting a lot > of pressure on the driver. It doesn't look like this is the issue you're > running into, though, because it would not be a progressive slow down, but > please provide as much detail as possible about your app. > > The cache could be an issue but the OOM would come from an executor, not > from the driver. > > From what you're saying, Keith, it indeed looks like some memory is not > being freed. Seeing the code would help. If you can, also send all the logs > (with Spark at least in INFO level). > > Alexis > > On Fri, Nov 18, 2016 at 10:08 AM, Nathan Lande > wrote: > > +1 to not threading. > > What does your load look like? If you are loading many files and cacheing > them in N rdds rather than 1 rdd this could be an issue. > > If the above two things don't fix your oom issue, without knowing anything > else about your job, I would focus on your cacheing strategy as a potential > culprit. Try running without any cacheing to isolate the issue; bad > cacheing strategy is the source of oom issues for me most of the time. > > On Nov 18, 2016 6:31 AM, "Keith Bourgoin" wrote: > > Hi Alexis, > > Thanks for the response. I've been working with Irina on trying to sort > this issue out. > > We thread the file processing to amortize the cost of things like getting > files from S3. It's a pattern we've seen recommended in many places, but I > don't have any of those links handy. The problem isn't the threading, per > se, but clearly some sort of memory leak in the driver itself. Each file > is a self-contained unit of work, so once it's done all memory related to > it should be freed. Nothing in the script itself grows over time, so if it > can do 10 concurrently, it should be able to run like that forever. > > I've hit this same issue working on another Spark app which wasn't > threaded, but produced tens of thousands of jobs. Eventually, the Spark UI > would get slow, then unresponsive, and then be killed due to OOM. > > I'll try to cook up some examples of this today, threaded and not. We were > hoping that someone had seen this before and it rung a bell. Maybe there's > a setting to clean up info from old jobs that we can adjust. > > Cheers, > > Keith. > > On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin < > aseigneu...@ipponusa.com> wrote: > > Hi Irina, > > I would question the use of multiple threads in your application. Since > Spark is going to run the processing of each DataFrame on all the cores of > your cluster, the processes will be competing for resources. In fact, they > would not only compete for CPU cores but also for memory. > > Spark is designed to run your processes in a sequence, and each process > will be run in a distributed manner (multiple threads on multiple > instances). I would suggest to follow this principle. > > Feel free to share to code if you can. It's always helpful so that we can > give better advice. > > Alexis > > On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong wrote: > > We have an application that reads text files, converts them to dataframes, > and saves them in Parquet format. The application runs fine when processing > a few files, but we have several thousand produced every day. When running > the job for all files, we have spark-submit killed on OOM: > > # > # java.lang.OutOfMemoryError:
Re: Long-running job OOMs driver process
Hi Alexis, Thanks for the response. I've been working with Irina on trying to sort this issue out. We thread the file processing to amortize the cost of things like getting files from S3. It's a pattern we've seen recommended in many places, but I don't have any of those links handy. The problem isn't the threading, per se, but clearly some sort of memory leak in the driver itself. Each file is a self-contained unit of work, so once it's done all memory related to it should be freed. Nothing in the script itself grows over time, so if it can do 10 concurrently, it should be able to run like that forever. I've hit this same issue working on another Spark app which wasn't threaded, but produced tens of thousands of jobs. Eventually, the Spark UI would get slow, then unresponsive, and then be killed due to OOM. I'll try to cook up some examples of this today, threaded and not. We were hoping that someone had seen this before and it rung a bell. Maybe there's a setting to clean up info from old jobs that we can adjust. Cheers, Keith. On Thu, Nov 17, 2016 at 9:50 PM Alexis Seigneurin wrote: > Hi Irina, > > I would question the use of multiple threads in your application. Since > Spark is going to run the processing of each DataFrame on all the cores of > your cluster, the processes will be competing for resources. In fact, they > would not only compete for CPU cores but also for memory. > > Spark is designed to run your processes in a sequence, and each process > will be run in a distributed manner (multiple threads on multiple > instances). I would suggest to follow this principle. > > Feel free to share to code if you can. It's always helpful so that we can > give better advice. > > Alexis > > On Thu, Nov 17, 2016 at 8:51 PM, Irina Truong wrote: > > We have an application that reads text files, converts them to dataframes, > and saves them in Parquet format. The application runs fine when processing > a few files, but we have several thousand produced every day. When running > the job for all files, we have spark-submit killed on OOM: > > # > # java.lang.OutOfMemoryError: Java heap space > # -XX:OnOutOfMemoryError="kill -9 %p" > # Executing /bin/sh -c "kill -9 27226"... > > The job is written in Python. We’re running it in Amazon EMR 5.0 (Spark > 2.0.0) with spark-submit. We’re using a cluster with a master c3.2xlarge > instance (8 cores and 15g of RAM) and 3 core c3.4xlarge instances (16 cores > and 30g of RAM each). Spark config settings are as follows: > > ('spark.serializer', 'org.apache.spark.serializer.KryoSerializer'), > > ('spark.executors.instances', '3'), > > ('spark.yarn.executor.memoryOverhead', '9g'), > > ('spark.executor.cores', '15'), > > ('spark.executor.memory', '12g'), > > ('spark.scheduler.mode', 'FIFO'), > > ('spark.cleaner.ttl', '1800'), > > The job processes each file in a thread, and we have 10 threads running > concurrently. The process will OOM after about 4 hours, at which point > Spark has processed over 20,000 jobs. > > It seems like the driver is running out of memory, but each individual job > is quite small. Are there any known memory leaks for long-running Spark > applications on Yarn? > > > > > -- > > *Alexis Seigneurin* > *Managing Consultant* > (202) 459-1591 <202%20459.1591> - LinkedIn > <http://www.linkedin.com/in/alexisseigneurin> > > <http://ipponusa.com/> > Rate our service <https://www.recommendi.com/app/survey/Eh2ZnWUPTxY> >
Re: Using Java in Spark shell
There is no java shell in spark. > On May 25, 2016, at 1:11 AM, Ashok Kumar wrote: > > Hello, > > A newbie question. > > Is it possible to use java code directly in spark shell without using maven > to build a jar file? > > How can I switch from scala to java in spark shell? > > Thanks > >
Spark SQL "partition stride"?
The spark docs section for "JDBC to Other Databases" (https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases) describes the partitioning as "... Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table." What is meant by "partition stride" here, I'm not familiar with the phrase and googling didn't help. Also, is the behavior of this partitioning described in detail somewhere? Looking at my SQL query log I've figured out what it's doing in my example: say X = (upperBound - lowerBound) / numPartitions): query * where partitionColumn < lowerBound query * where partitionColumn >= lowerBound and partitionColumn < lowerBound + X query * where parititionColumn >= lowerBound+X and partitionColumn < lowerBound+2X until the query gets to upperBound But it would be nice to know if there's docs on this? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
python rdd.partionBy(): any examples of a custom partitioner?
I'm not a python expert, so I'm wondering if anybody has a working example of a partitioner for the "partitionFunc" argument (default "portable_hash") to rdd.partitionBy()? - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark 1.4.0 SQL JDBC "partition stride"?
The spark docs section for "JDBC to Other Databases" (https://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases) describes the partitioning as "... Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table." What is meant by "partition stride" here, I'm not familiar with the phrase and googling didn't help. Also, is the behaviour of this partitioning described in detail somewhere? Looking at my SQL query log I've figured out what it's doing in my example (X = (upperBound - lowerBound) / numPartitions): query * where partitionColumn < lowerBound query * where partitionColumn >= lowerBound and partitionColumn < lowerBound + X query * where parititionColumn >= lowerBound+X and partitionColumn < lowerBound+X+X until the query gets to upperBound But it would be nice to know if there's docs on this?
Re: Loading RDDs in a streaming fashion
Yep, that's definitely possible. It's one of the workarounds I was considering. I was just curious if there was a simpler (and perhaps more efficient) approach. Keith On Mon, Dec 1, 2014 at 6:28 PM, Andy Twigg wrote: > Could you modify your function so that it streams through the files record > by record and outputs them to hdfs, then read them all in as RDDs and take > the union? That would only use bounded memory. > > On 1 December 2014 at 17:19, Keith Simmons wrote: > >> Actually, I'm working with a binary format. The api allows reading out a >> single record at a time, but I'm not sure how to get those records into >> spark (without reading everything into memory from a single file at once). >> >> >> >> On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg wrote: >> >>> file => tranform file into a bunch of records >>> >>> >>> What does this function do exactly? Does it load the file locally? >>> Spark supports RDDs exceeding global RAM (cf the terasort example), but >>> if your example just loads each file locally, then this may cause problems. >>> Instead, you should load each file into an rdd with context.textFile(), >>> flatmap that and union these rdds. >>> >>> also see >>> >>> http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files >>> >>> >>> On 1 December 2014 at 16:50, Keith Simmons wrote: >>> >>>> This is a long shot, but... >>>> >>>> I'm trying to load a bunch of files spread out over hdfs into an RDD, >>>> and in most cases it works well, but for a few very large files, I exceed >>>> available memory. My current workflow basically works like this: >>>> >>>> context.parallelize(fileNames).flatMap { file => >>>> tranform file into a bunch of records >>>> } >>>> >>>> I'm wondering if there are any APIs to somehow "flush" the records of a >>>> big dataset so I don't have to load them all into memory at once. I know >>>> this doesn't exist, but conceptually: >>>> >>>> context.parallelize(fileNames).streamMap { (file, stream) => >>>> for every 10K records write records to stream and flush >>>> } >>>> >>>> Keith >>>> >>> >>> >> >
Re: Loading RDDs in a streaming fashion
Actually, I'm working with a binary format. The api allows reading out a single record at a time, but I'm not sure how to get those records into spark (without reading everything into memory from a single file at once). On Mon, Dec 1, 2014 at 5:07 PM, Andy Twigg wrote: > file => tranform file into a bunch of records > > > What does this function do exactly? Does it load the file locally? > Spark supports RDDs exceeding global RAM (cf the terasort example), but if > your example just loads each file locally, then this may cause problems. > Instead, you should load each file into an rdd with context.textFile(), > flatmap that and union these rdds. > > also see > > http://stackoverflow.com/questions/23397907/spark-context-textfile-load-multiple-files > > > On 1 December 2014 at 16:50, Keith Simmons wrote: > >> This is a long shot, but... >> >> I'm trying to load a bunch of files spread out over hdfs into an RDD, and >> in most cases it works well, but for a few very large files, I exceed >> available memory. My current workflow basically works like this: >> >> context.parallelize(fileNames).flatMap { file => >> tranform file into a bunch of records >> } >> >> I'm wondering if there are any APIs to somehow "flush" the records of a >> big dataset so I don't have to load them all into memory at once. I know >> this doesn't exist, but conceptually: >> >> context.parallelize(fileNames).streamMap { (file, stream) => >> for every 10K records write records to stream and flush >> } >> >> Keith >> > >
Loading RDDs in a streaming fashion
This is a long shot, but... I'm trying to load a bunch of files spread out over hdfs into an RDD, and in most cases it works well, but for a few very large files, I exceed available memory. My current workflow basically works like this: context.parallelize(fileNames).flatMap { file => tranform file into a bunch of records } I'm wondering if there are any APIs to somehow "flush" the records of a big dataset so I don't have to load them all into memory at once. I know this doesn't exist, but conceptually: context.parallelize(fileNames).streamMap { (file, stream) => for every 10K records write records to stream and flush } Keith
Re: Setting only master heap
Hi Guys, Here's some lines from the log file before the OOM. They don't look that helpful, so let me know if there's anything else I should be sending. I am running in standalone mode. spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:00:36 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-52] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:Exception in thread "qtp2057079871-30" java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:00:07 WARN AbstractNioSelector: Unexpected exception in the selector loop. spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:02:51 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-8] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-38] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-6] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-43] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-13] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-5] shutting down ActorSystem [sparkMaster] spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5:java.lang.OutOfMemoryError: Java heap space spark-pulse-org.apache.spark.deploy.master.Master-1-hadoop10.pulse.io.out.5-14/10/22 05:03:22 ERROR ActorSystemImpl: Uncaught fatal error from thread [sparkMaster-akka.actor.default-dispatcher-12] shutting down ActorSystem [sparkMaster] On Thu, Oct 23, 2014 at 2:10 PM, Nan Zhu wrote: > h… > > my observation is that, master in Spark 1.1 has higher frequency of GC…… > > Also, before 1.1, I never encounter GC overtime in Master, after upgrade > to 1.1, I have met for 2 times (we upgrade soon after 1.1 release)…. > > Best, > > -- > Nan Zhu > > On Thursday, October 23, 2014 at 1:08 PM, Andrew Or wrote: > > Yeah, as Sameer commented, there is unfortunately not an equivalent > `SPARK_MASTER_MEMORY` that you can set. You can work around this by > starting the master and the slaves separately with different settings of > SPARK_DAEMON_MEMORY each time. > > AFAIK there haven't been any major changes in the standalone master in > 1.1.0, so I don't see an immediate explanation for what you're observing. > In general the Spark master doesn't use that much memory, and even if there > are many applications it will discard the old ones appropriately, so unless > you have a ton (like thousands) of concurrently running applications > connecting to it there's little likelihood for it to OOM. At least that's > my understanding. > > -Andrew > > 2014-10-22 15:51 GMT-07:00 Sameer Farooqui : > > Hi Keith, > > Would be helpful if you could post the error message. > > Are you running Spark in Standalone mode or with YARN? > > In general, the Spark Master is only used for scheduling and it should be > fine with the default setting of 512 MB RAM. > > Is it actually the Spark Driver's memory that you intended to change? > > > > *++ If in Standalone m
Setting only master heap
We've been getting some OOMs from the spark master since upgrading to Spark 1.1.0. I've found SPARK_DAEMON_MEMORY, but that also seems to increase the worker heap, which as far as I know is fine. Is there any setting which *only* increases the master heap size? Keith
Re: Hung spark executors don't count toward worker memory limit
Maybe I should put this another way. If spark has two jobs, A and B, both of which consume the entire allocated memory pool, is it expected that spark can launch B before the executor processes tied to A are completely terminated? On Thu, Oct 9, 2014 at 6:57 PM, Keith Simmons wrote: > Actually, it looks like even when the job shuts down cleanly, there can be > a few minutes of overlap between the time the next job launches and the > first job actually terminates it's process. Here's some relevant lines > from my log: > > 14/10/09 20:49:20 INFO Worker: Asked to kill executor > app-20141009204127-0029/1 > 14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor > app-20141009204127-0029/1 interrupted > 14/10/09 20:49:20 INFO ExecutorRunner: Killing process! > 14/10/09 20:49:20 INFO Worker: Asked to launch executor > app-20141009204508-0030/1 for Job > ... More lines about launching new job... > 14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished > with state KILLED > > As you can see, the first app didn't actually shutdown until two minutes > after the new job launched. During that time, I was at double the worker > memory limit. > > Keith > > > On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons wrote: > >> Hi Folks, >> >> We have a spark job that is occasionally running out of memory and >> hanging (I believe in GC). This is it's own issue we're debugging, but in >> the meantime, there's another unfortunate side effect. When the job is >> killed (most often because of GC errors), each worker attempts to kill its >> respective executor. However, it appears that several of the executors >> fail to shut themselves down (I actually have to kill -9 them). However, >> even though the worker fails to successfully cleanup the executor, it >> starts the next job as though all the resources have been freed up. This >> is causing the spark worker to exceed it's configured memory limit, which >> is in turn running our boxes out of memory. Is there a setting I can >> configure to prevent this issue? Perhaps by having the worker force kill >> the executor or not start the next job until it's confirmed the executor >> has exited? Let me know if there's any additional information I can >> provide. >> >> Keith >> >> P.S. We're running spark 1.0.2 >> > >
Re: Hung spark executors don't count toward worker memory limit
Actually, it looks like even when the job shuts down cleanly, there can be a few minutes of overlap between the time the next job launches and the first job actually terminates it's process. Here's some relevant lines from my log: 14/10/09 20:49:20 INFO Worker: Asked to kill executor app-20141009204127-0029/1 14/10/09 20:49:20 INFO ExecutorRunner: Runner thread for executor app-20141009204127-0029/1 interrupted 14/10/09 20:49:20 INFO ExecutorRunner: Killing process! 14/10/09 20:49:20 INFO Worker: Asked to launch executor app-20141009204508-0030/1 for Job ... More lines about launching new job... 14/10/09 20:51:17 INFO Worker: Executor app-20141009204127-0029/1 finished with state KILLED As you can see, the first app didn't actually shutdown until two minutes after the new job launched. During that time, I was at double the worker memory limit. Keith On Thu, Oct 9, 2014 at 5:06 PM, Keith Simmons wrote: > Hi Folks, > > We have a spark job that is occasionally running out of memory and hanging > (I believe in GC). This is it's own issue we're debugging, but in the > meantime, there's another unfortunate side effect. When the job is killed > (most often because of GC errors), each worker attempts to kill its > respective executor. However, it appears that several of the executors > fail to shut themselves down (I actually have to kill -9 them). However, > even though the worker fails to successfully cleanup the executor, it > starts the next job as though all the resources have been freed up. This > is causing the spark worker to exceed it's configured memory limit, which > is in turn running our boxes out of memory. Is there a setting I can > configure to prevent this issue? Perhaps by having the worker force kill > the executor or not start the next job until it's confirmed the executor > has exited? Let me know if there's any additional information I can > provide. > > Keith > > P.S. We're running spark 1.0.2 >
Hung spark executors don't count toward worker memory limit
Hi Folks, We have a spark job that is occasionally running out of memory and hanging (I believe in GC). This is it's own issue we're debugging, but in the meantime, there's another unfortunate side effect. When the job is killed (most often because of GC errors), each worker attempts to kill its respective executor. However, it appears that several of the executors fail to shut themselves down (I actually have to kill -9 them). However, even though the worker fails to successfully cleanup the executor, it starts the next job as though all the resources have been freed up. This is causing the spark worker to exceed it's configured memory limit, which is in turn running our boxes out of memory. Is there a setting I can configure to prevent this issue? Perhaps by having the worker force kill the executor or not start the next job until it's confirmed the executor has exited? Let me know if there's any additional information I can provide. Keith P.S. We're running spark 1.0.2
Re: GraphX : AssertionError
The triangle count also failed for me when I ran it on more than one node. There is this assertion in TriangleCount.scala that causes the failure: // double count should be even (divisible by two) assert((dblCount & 1) == 0) That did not hold true when I ran this on multiple nodes, even when following the guidelines to make sure that all source ids are greater than destination ids and partitioning the graph using RandomVertexCut. I didn't dig into the code to see why this assertion was failing, but commenting that line out allowed the code to run. I'm not sure how much I can trust the results, but they looked generally right. On Wed, Sep 10, 2014 at 6:31 PM, Vipul Pandey wrote: > Hi, > > I have a small graph with about 3.3M vertices and close to 7.5M edges. > It's a pretty innocent graph with the max degree of 8. > Unfortunately, graph.traingleCount is failing on me with the exception > below. I'm running a spark-shell on CDH5.1 with the following params : > SPARK_DRIVER_MEM=10g ADD_JARS=./path/to/my-jar-with-dependencies.jar > SPARK_WORKER_INSTANCES=120 SPARK_WORKER_MEMORY=5g > SPARK_YARN_APP_NAME=VipulsSparkShell MASTER=yarn-client spark-shell > > Any clue anyone? > Vipul > > > 14/09/10 16:12:22 INFO cluster.YarnClientClusterScheduler: Stage 80 was > cancelled > 14/09/10 16:12:22 INFO scheduler.TaskSetManager: Loss was due to > java.lang.AssertionError: assertion failed [duplicate 8] > 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 326 was killed. > 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 325 was killed. > 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 320 was killed. > 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 324 was killed. > 14/09/10 16:12:22 WARN scheduler.TaskSetManager: Task 322 was killed. > > org.apache.spark.SparkException: Job aborted due to stage failure: Task > 80.0:6 failed 4 times, most recent failure: Exception failure in TID 321 on > host abc.xyz.com: java.lang.AssertionError: assertion failed > scala.Predef$.assert(Predef.scala:165) > > org.apache.spark.graphx.lib.TriangleCount$$anonfun$run$1.apply(TriangleCount.scala:89) > > org.apache.spark.graphx.lib.TriangleCount$$anonfun$run$1.apply(TriangleCount.scala:86) > > org.apache.spark.graphx.impl.VertexPartitionBaseOps.leftJoin(VertexPartitionBaseOps.scala:125) > > org.apache.spark.graphx.VertexRDD$$anonfun$3.apply(VertexRDD.scala:192) > > org.apache.spark.graphx.VertexRDD$$anonfun$3.apply(VertexRDD.scala:189) > > org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:87) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) > org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) > org.apache.spark.rdd.RDD.iterator(RDD.scala:229) > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) > > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) > org.apache.spark.scheduler.Task.run(Task.scala:51) > > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > java.lang.Thread.run(Thread.java:744) > Driver stacktrace: > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:633) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:633) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjo
Re: Error while running Spark SQL join when using Spark 1.0.1
Cool. So Michael's hunch was correct, it is a thread issue. I'm currently using a tarball build, but I'll do a spark build with the patch as soon as I have a chance and test it out. Keith On Tue, Jul 15, 2014 at 4:14 PM, Zongheng Yang wrote: > Hi Keith & gorenuru, > > This patch (https://github.com/apache/spark/pull/1423) solves the > errors for me in my local tests. If possible, can you guys test this > out to see if it solves your test programs? > > Thanks, > Zongheng > > On Tue, Jul 15, 2014 at 3:08 PM, Zongheng Yang > wrote: > > - user@incubator > > > > Hi Keith, > > > > I did reproduce this using local-cluster[2,2,1024], and the errors > > look almost the same. Just wondering, despite the errors did your > > program output any result for the join? On my machine, I could see the > > correct output. > > > > Zongheng > > > > On Tue, Jul 15, 2014 at 1:46 PM, Michael Armbrust > > wrote: > >> Thanks for the extra info. At a quick glance the query plan looks fine > to > >> me. The class IntegerType does build a type tag I wonder if you are > >> seeing the Scala issue manifest in some new way. We will attempt to > >> reproduce locally. > >> > >> > >> On Tue, Jul 15, 2014 at 1:41 PM, gorenuru wrote: > >>> > >>> Just my "few cents" on this. > >>> > >>> I having the same problems with v 1.0.1 but this bug is sporadic and > looks > >>> like is relayed to object initialization. > >>> > >>> Even more, i'm not using any SQL or something. I just have utility > class > >>> like this: > >>> > >>> object DataTypeDescriptor { > >>> type DataType = String > >>> > >>> val BOOLEAN = "BOOLEAN" > >>> val STRING = "STRING" > >>> val TIMESTAMP = "TIMESTAMP" > >>> val LONG = "LONG" > >>> val INT = "INT" > >>> val SHORT = "SHORT" > >>> val BYTE = "BYTE" > >>> val DECIMAL = "DECIMAL" > >>> val DOUBLE = "DOUBLE" > >>> val FLOAT = "FLOAT" > >>> > >>> def $$(name: String, format: Option[String] = None) = > >>> DataTypeDescriptor(name, format) > >>> > >>> private lazy val nativeTypes: Map[String, NativeType] = Map( > >>> BOOLEAN -> BooleanType, STRING -> StringType, TIMESTAMP -> > >>> TimestampType, LONG -> LongType, INT -> IntegerType, > >>> SHORT -> ShortType, BYTE -> ByteType, DECIMAL -> DecimalType, > DOUBLE > >>> -> > >>> DoubleType, FLOAT -> FloatType > >>> ) > >>> > >>> lazy val defaultValues: Map[String, Any] = Map( > >>> BOOLEAN -> false, STRING -> "", TIMESTAMP -> null, LONG -> 0L, INT > -> > >>> 0, > >>> SHORT -> 0.toShort, BYTE -> 0.toByte, > >>> DECIMAL -> BigDecimal(0d), DOUBLE -> 0d, FLOAT -> 0f > >>> ) > >>> > >>> def apply(dataType: String): DataTypeDescriptor = { > >>> DataTypeDescriptor(dataType.toUpperCase, None) > >>> } > >>> > >>> def apply(dataType: SparkDataType): DataTypeDescriptor = { > >>> nativeTypes > >>> .find { case (_, descriptor) => descriptor == dataType } > >>> .map { case (name, descriptor) => DataTypeDescriptor(name, None) > } > >>> .get > >>> } > >>> > >>> . > >>> > >>> and some test that check each of this methods. > >>> > >>> The problem is that this test fails randomly with this error. > >>> > >>> P.S.: I did not have this problem in Spark 1.0.0 > >>> > >>> > >>> > >>> -- > >>> View this message in context: > >>> > http://apache-spark-user-list.1001560.n3.nabble.com/Error-while-running-Spark-SQL-join-when-using-Spark-1-0-1-tp9776p9817.html > >>> Sent from the Apache Spark User List mailing list archive at > Nabble.com. > >> > >> >
Re: Error while running Spark SQL join when using Spark 1.0.1
va:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.scheduler.ShuffleMapTask$.deserializeInfo(ShuffleMapTask.scala:63) at org.apache.spark.scheduler.ShuffleMapTask.readExternal(ShuffleMapTask.scala:135) at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1836) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1795) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1349) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:369) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:63) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:85) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:165) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:701) On Tue, Jul 15, 2014 at 1:05 PM, Michael Armbrust wrote: > Can you print out the queryExecution? > > (i.e. println(sql().queryExecution)) > > > On Tue, Jul 15, 2014 at 12:44 PM, Keith Simmons > wrote: > >> To give a few more details of my environment in case that helps you >> reproduce: >> >> * I'm running spark 1.0.1 downloaded as a tar ball, not built myself >> * I'm running in stand alone mode, with 1 master and 1 worker, both on >> the same machine (though the same error occurs with two workers on two >> machines) >> * I'm using spark-core and spark-sql 1.0.1 pulled via maven >> >> Here's my built.sbt: >> >> name := "spark-test" >> >> version := "1.0" >> >> scalaVersion := "2.10.4" >> >> resolvers += "Akka Repository" at "http://repo.akka.io/releases/"; >> >> resolvers += "Cloudera Repository" at " >> https://repository.cloudera.com/artifactory/cloudera-repos/"; >> >> libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.1" % >> "provided" >> >> libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1" % >> "provided" >> >> >> On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang >> wrote: >> >>> FWIW, I am unable to reproduce this using the example program locally. >>> >>> On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons >>> wrote: >>> > Nope. All of them are registered from the driver program. >>> > >>> > However, I think we've found the culprit. If the join column between >>> two >>> > tables is not in the same column position in both tables, it triggers >>> what >>> > appears to be a bug. For example, this program fails: >>> > >>> > import org.apache.spark.SparkContext._ >>> > import org.apache.spark.SparkContext >>> > import org.apache.spark.SparkConf >>> > import org.apache.spark.sql.SQLContext >>> > import org.apache.spark.sql.SchemaRDD >>> > import org.apache.spark.sql.catalyst.types._ >>> > >>> > case class Record(value: String, key: Int) >>> > case class Record2(key: Int, value: String) >>> > >>> > object TestJob { >>> > >>> > def main(args: Array[String]) { >>> > run() >>> > } >>> > >>> > private def run() { >>> > val sparkConf = new SparkConf() >>> > sparkConf.setAppName("TestJob") >>> > sparkConf.set("spark.cores.max", "8") >>> > sparkConf.set("spark.storage.memoryFraction", "0.1") >>> > sparkConf.set("spark.shuffle.memoryFracton", "0.2") >>> > sparkConf.set("spark.executor.memory", "2g") >>> > >>> sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar")) >>> > sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077") >>> > sparkConf.setSparkHome("/home/pulseio/spark/current") >>> > val sc = new SparkContext(sparkConf) >>> > >>> > val sqlContext = new org.apache.spark.sql.SQLContext(sc) >>> > import sqlContext._ >>> > >>> > val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", >>> i))) >>> > val rdd2 = sc.parallelize((1 to 100).map(i
Re: Error while running Spark SQL join when using Spark 1.0.1
To give a few more details of my environment in case that helps you reproduce: * I'm running spark 1.0.1 downloaded as a tar ball, not built myself * I'm running in stand alone mode, with 1 master and 1 worker, both on the same machine (though the same error occurs with two workers on two machines) * I'm using spark-core and spark-sql 1.0.1 pulled via maven Here's my built.sbt: name := "spark-test" version := "1.0" scalaVersion := "2.10.4" resolvers += "Akka Repository" at "http://repo.akka.io/releases/"; resolvers += "Cloudera Repository" at " https://repository.cloudera.com/artifactory/cloudera-repos/"; libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.0.1" % "provided" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.0.1" % "provided" On Tue, Jul 15, 2014 at 12:21 PM, Zongheng Yang wrote: > FWIW, I am unable to reproduce this using the example program locally. > > On Tue, Jul 15, 2014 at 11:56 AM, Keith Simmons > wrote: > > Nope. All of them are registered from the driver program. > > > > However, I think we've found the culprit. If the join column between two > > tables is not in the same column position in both tables, it triggers > what > > appears to be a bug. For example, this program fails: > > > > import org.apache.spark.SparkContext._ > > import org.apache.spark.SparkContext > > import org.apache.spark.SparkConf > > import org.apache.spark.sql.SQLContext > > import org.apache.spark.sql.SchemaRDD > > import org.apache.spark.sql.catalyst.types._ > > > > case class Record(value: String, key: Int) > > case class Record2(key: Int, value: String) > > > > object TestJob { > > > > def main(args: Array[String]) { > > run() > > } > > > > private def run() { > > val sparkConf = new SparkConf() > > sparkConf.setAppName("TestJob") > > sparkConf.set("spark.cores.max", "8") > > sparkConf.set("spark.storage.memoryFraction", "0.1") > > sparkConf.set("spark.shuffle.memoryFracton", "0.2") > > sparkConf.set("spark.executor.memory", "2g") > > > sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar")) > > sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077") > > sparkConf.setSparkHome("/home/pulseio/spark/current") > > val sc = new SparkContext(sparkConf) > > > > val sqlContext = new org.apache.spark.sql.SQLContext(sc) > > import sqlContext._ > > > > val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", i))) > > val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i, s"rdd_$i"))) > > rdd1.registerAsTable("rdd1") > > rdd2.registerAsTable("rdd2") > > > > sql("SELECT * FROM rdd1").collect.foreach { row => println(row) } > > > > sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on > > rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row => > > println(row) } > > > > sc.stop() > > } > > > > } > > > > If you change the definition of Record and Record2 to the following, it > > succeeds: > > > > case class Record(key: Int, value: String) > > case class Record2(key: Int, value: String) > > > > as does: > > > > case class Record(value: String, key: Int) > > case class Record2(value: String, key: Int) > > > > Let me know if you need anymore details. > > > > > > On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust < > mich...@databricks.com> > > wrote: > >> > >> Are you registering multiple RDDs of case classes as tables > concurrently? > >> You are possibly hitting SPARK-2178 which is caused by SI-6240. > >> > >> > >> On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons < > keith.simm...@gmail.com> > >> wrote: > >>> > >>> HI folks, > >>> > >>> I'm running into the following error when trying to perform a join in > my > >>> code: > >>> > >>> java.lang.NoClassDefFoundError: Could not initialize class > >>> org.apache.spark.sql.catalyst.types.LongType$ > >>> > >>> I see similar errors for StringType$ and also: > >>> > >>> scala.reflect.runtime.ReflectError: value apache is not a package. > >>> > >>> Strangely, if I just work with a single table, everything is fine. I > can > >>> iterate through the records in both tables and print them out without a > >>> problem. > >>> > >>> Furthermore, this code worked without an exception in Spark 1.0.0 > >>> (thought the join caused some field corruption, possibly related to > >>> https://issues.apache.org/jira/browse/SPARK-1994). The data is > coming from > >>> a custom protocol buffer based format on hdfs that is being mapped > into the > >>> individual record types without a problem. > >>> > >>> The immediate cause seems to be a task trying to deserialize one or > more > >>> SQL case classes before loading the spark uber jar, but I have no idea > why > >>> this is happening, or why it only happens when I do a join. Ideas? > >>> > >>> Keith > >>> > >>> P.S. If it's relevant, we're using the Kryo serializer. > >>> > >>> > >> > > >
Re: Error while running Spark SQL join when using Spark 1.0.1
Nope. All of them are registered from the driver program. However, I think we've found the culprit. If the join column between two tables is not in the same column position in both tables, it triggers what appears to be a bug. For example, this program fails: import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql.SQLContext import org.apache.spark.sql.SchemaRDD import org.apache.spark.sql.catalyst.types._ case class Record(value: String, key: Int) case class Record2(key: Int, value: String) object TestJob { def main(args: Array[String]) { run() } private def run() { val sparkConf = new SparkConf() sparkConf.setAppName("TestJob") sparkConf.set("spark.cores.max", "8") sparkConf.set("spark.storage.memoryFraction", "0.1") sparkConf.set("spark.shuffle.memoryFracton", "0.2") sparkConf.set("spark.executor.memory", "2g") sparkConf.setJars(List("target/scala-2.10/spark-test-assembly-1.0.jar")) sparkConf.setMaster(s"spark://dev1.dev.pulse.io:7077") sparkConf.setSparkHome("/home/pulseio/spark/current") val sc = new SparkContext(sparkConf) val sqlContext = new org.apache.spark.sql.SQLContext(sc) import sqlContext._ val rdd1 = sc.parallelize((1 to 100).map(i => Record(s"val_$i", i))) val rdd2 = sc.parallelize((1 to 100).map(i => Record2(i, s"rdd_$i"))) rdd1.registerAsTable("rdd1") rdd2.registerAsTable("rdd2") sql("SELECT * FROM rdd1").collect.foreach { row => println(row) } sql("SELECT rdd1.key, rdd1.value, rdd2.value FROM rdd1 join rdd2 on rdd1.key = rdd2.key order by rdd1.key").collect.foreach { row => println(row) } sc.stop() } } If you change the definition of Record and Record2 to the following, it succeeds: case class Record(key: Int, value: String) case class Record2(key: Int, value: String) as does: case class Record(value: String, key: Int) case class Record2(value: String, key: Int) Let me know if you need anymore details. On Tue, Jul 15, 2014 at 11:14 AM, Michael Armbrust wrote: > Are you registering multiple RDDs of case classes as tables concurrently? > You are possibly hitting SPARK-2178 > <https://issues.apache.org/jira/browse/SPARK-2178> which is caused by > SI-6240 <https://issues.scala-lang.org/browse/SI-6240>. > > > On Tue, Jul 15, 2014 at 10:49 AM, Keith Simmons > wrote: > >> HI folks, >> >> I'm running into the following error when trying to perform a join in my >> code: >> >> java.lang.NoClassDefFoundError: Could not initialize class >> org.apache.spark.sql.catalyst.types.LongType$ >> >> I see similar errors for StringType$ and also: >> >> scala.reflect.runtime.ReflectError: value apache is not a package. >> >> Strangely, if I just work with a single table, everything is fine. I can >> iterate through the records in both tables and print them out without a >> problem. >> >> Furthermore, this code worked without an exception in Spark 1.0.0 >> (thought the join caused some field corruption, possibly related to >> https://issues.apache.org/jira/browse/SPARK-1994 >> <https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994&sa=D&sntz=1&usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q>). >> The data is coming from a custom protocol buffer based format on hdfs that >> is being mapped into the individual record types without a problem. >> >> The immediate cause seems to be a task trying to deserialize one or more >> SQL case classes before loading the spark uber jar, but I have no idea why >> this is happening, or why it only happens when I do a join. Ideas? >> >> Keith >> >> P.S. If it's relevant, we're using the Kryo serializer. >> >> >> >
Error while running Spark SQL join when using Spark 1.0.1
HI folks, I'm running into the following error when trying to perform a join in my code: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.sql.catalyst.types.LongType$ I see similar errors for StringType$ and also: scala.reflect.runtime.ReflectError: value apache is not a package. Strangely, if I just work with a single table, everything is fine. I can iterate through the records in both tables and print them out without a problem. Furthermore, this code worked without an exception in Spark 1.0.0 (thought the join caused some field corruption, possibly related to https://issues.apache.org/jira/browse/SPARK-1994 <https://www.google.com/url?q=https%3A%2F%2Fissues.apache.org%2Fjira%2Fbrowse%2FSPARK-1994&sa=D&sntz=1&usg=AFQjCNHNxePxWgmuymCQSprulDZZcOn4-Q>). The data is coming from a custom protocol buffer based format on hdfs that is being mapped into the individual record types without a problem. The immediate cause seems to be a task trying to deserialize one or more SQL case classes before loading the spark uber jar, but I have no idea why this is happening, or why it only happens when I do a join. Ideas? Keith P.S. If it's relevant, we're using the Kryo serializer.
Re: Comparative study
Good point. Shows how personal use cases color how we interpret products. On Wed, Jul 9, 2014 at 1:08 AM, Sean Owen wrote: > On Wed, Jul 9, 2014 at 1:52 AM, Keith Simmons wrote: > >> Impala is *not* built on map/reduce, though it was built to replace >> Hive, which is map/reduce based. It has its own distributed query engine, >> though it does load data from HDFS, and is part of the hadoop ecosystem. >> Impala really shines when your >> > > (It was not built to replace Hive. It's purpose-built to make interactive > use with a BI tool feasible -- single-digit second queries on huge data > sets. It's very memory hungry. Hive's architecture choices and legacy code > have been throughput-oriented, and can't really get below minutes at scale, > but, remains a right choice when you are in fact doing ETL!) >
Re: Comparative study
Santosh, To add a bit more to what Nabeel said, Spark and Impala are very different tools. Impala is *not* built on map/reduce, though it was built to replace Hive, which is map/reduce based. It has its own distributed query engine, though it does load data from HDFS, and is part of the hadoop ecosystem. Impala really shines when your entire dataset fits into memory and your processing can be expressed in terms of sql. Paired with the column oriented Parquet format, it can really scream with the right dataset. Spark also has a SQL layer (formely shark, now more tightly integrated with Spark), but at least for our dataset, Impala was faster. However, Spark has a fantastic and far more flexible programming model. As has been mentioned a few times in this thread, it has a better batch processing model than map/reduce, it can do stream processing, and in the newest release, it looks like it can even mix and match sql queries. You do need to be more aware of memory issues than map/reduce, since using more memory is one of the primary sources of Sparks speed, but with that caveat, its a great technology. In our particular workflow, we're replacing map/reduce with spark for our batch layer and using Impala for our query layer. Daniel, For what it's worth, we've had a bunch of hanging issues because the garbage collector seems to get out of control. The most effective technique has been to dramatically increase the numPartition argument in our various groupBy and cogroup calls which reduces the per-task memory requirements. We also reduced the memory used by the shuffler ( spark.shuffle.memoryFraction) and turned off RDD memory (since we don't have any iterative algorithms). Finally, using kryo delivered a hug performance and memory boost (even without registering any custom serializers). Keith On Tue, Jul 8, 2014 at 2:58 PM, Robert James wrote: > As a new user, I can definitely say that my experience with Spark has > been rather raw. The appeal of interactive, batch, and in between all > using more or less straight Scala is unarguable. But the experience > of deploying Spark has been quite painful, mainly about gaps between > compile time and run time to the JVM, due to dependency conflicts, > having to use uber jars, Spark's own uber jar which includes some very > old libs, etc. > > What's more, there's very little resources available to help. Some > times I've been able to get help via public sources, but, more often > than not, it's been trial and error. Enough that, despite Spark's > unmistakable appeal, we are seriously considering dropping it entirely > and just doing a classical Hadoop. > > On 7/8/14, Surendranauth Hiraman wrote: > > Aaron, > > > > I don't think anyone was saying Spark can't handle this data size, given > > testimony from the Spark team, Bizo, etc., on large datasets. This has > kept > > us trying different things to get our flow to work over the course of > > several weeks. > > > > Agreed that the first instinct should be "what did I do wrong". > > > > I believe that is what every person facing this issue has done, in > reaching > > out to the user group repeatedly over the course of the few of months > that > > I've been active here. I also know other companies (all experienced with > > large production datasets on other platforms) facing the same types of > > issues - flows that run on subsets of data but not the whole production > > set. > > > > So I think, as you are saying, it points to the need for further > > diagnostics. And maybe also some type of guidance on typical issues with > > different types of datasets (wide rows, narrow rows, etc.), flow > > topologies. etc.? Hard to tell where we are going wrong right now. We've > > tried many things over the course of 6 weeks or so. > > > > I tried to look for the professional services link on databricks.com but > > didn't find it. ;-) (jk). > > > > -Suren > > > > > > > > On Tue, Jul 8, 2014 at 4:16 PM, Aaron Davidson > wrote: > > > >> Not sure exactly what is happening but perhaps there are ways to > >>> restructure your program for it to work better. Spark is definitely > able > >>> to > >>> handle much, much larger workloads. > >> > >> > >> +1 @Reynold > >> > >> Spark can handle big "big data". There are known issues with informing > >> the > >> user about what went wrong and how to fix it that we're actively working > >> on, but the first impulse when a job fails should be "what did I do > >> wrong" > >> rather tha
Re: Spark Memory Bounds
Thanks! Sounds like my rough understanding was roughly right :) Definitely understand cached RDDs can add to the memory requirements. Luckily, like you mentioned, you can configure spark to flush that to disk and bound its total size in memory via spark.storage.memoryFraction, so I have a pretty good handle on the overall RDD contribution. Thanks for all the help. Keith On Wed, May 28, 2014 at 6:43 AM, Christopher Nguyen wrote: > Keith, please see inline. > > -- > Christopher T. Nguyen > Co-founder & CEO, Adatao <http://adatao.com> > linkedin.com/in/ctnguyen > > > > On Tue, May 27, 2014 at 7:22 PM, Keith Simmons wrote: > >> A dash of both. I want to know enough that I can "reason about", rather >> than "strictly control", the amount of memory Spark will use. If I have a >> big data set, I want to understand how I can design it so that Spark's >> memory consumption falls below my available resources. Or alternatively, >> if it's even possible for Spark to process a data set over a certain size. >> And if I run into memory problems, I want to know which knobs to turn, and >> how turning those knobs will affect memory consumption. >> > > In practice, to avoid OOME, a key dial we use is the size (or inversely, > number) of the partitions of your dataset. Clearly there is some "blow-up > factor" F such that, e.g., if you start out with 128MB on-disk data > partitions, you would consume 128F MB of memory, both by Spark and by your > closure code. Knowing this, you would want to size the partitions such that > AvailableMemoryInMBPerWorker / NumberOfCoresPerWorker > 128F. To arrive at > F, you could do some back-of-the-envelope modeling, and/or run the job and > observe empirically. > > >> >> It's my understanding that between certain key stages in a Spark DAG >> (i.e. group by stages), Spark will serialize all data structures necessary >> to continue the computation at the next stage, including closures. So in >> theory, per machine, Spark only needs to hold the transient memory required >> to process the partitions assigned to the currently active tasks. Is my >> understanding correct? Specifically, once a key/value pair is serialized >> in the shuffle stage of a task, are the references to the raw java objects >> released before the next task is started. >> > > Yes, that is correct in non-cached mode. At the same time, Spark also does > something else optionally, which is to keep the data structures (RDDs) > persistent in memory (*). As such it is possible partitions that are not > being actively worked on to be consuming memory. Spark will spill all these > to local disk if they take up more memory than it is allowed to take. So > the key thing to worry about is less about what Spark does (apart of > overhead and yes, the possibility of bugs that need to be fixed), and more > about what your closure code does with JVM memory as a whole. If in doubt, > refer back to the "blow-up factor" model described above. > > (*) this is a fundamentally differentiating feature of Spark over a range > of other "in-memory" architectures, that focus on raw-data or transient > caches that serve non-equivalent purposes when viewed from the application > level. It allows for very fast access to ready-to-consume high-level data > structures, as long as available RAM permits. > > >> >> >> On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen wrote: >> >>> Keith, do you mean "bound" as in (a) strictly control to some >>> quantifiable limit, or (b) try to minimize the amount used by each task? >>> >>> If "a", then that is outside the scope of Spark's memory management, >>> which you should think of as an application-level (that is, above JVM) >>> mechanism. In this scope, Spark "voluntarily" tracks and limits the amount >>> of memory it uses for explicitly known data structures, such as RDDs. What >>> Spark cannot do is, e.g., control or manage the amount of JVM memory that a >>> given piece of user code might take up. For example, I might write some >>> closure code that allocates a large array of doubles unbeknownst to Spark. >>> >>> If "b", then your thinking is in the right direction, although quite >>> imperfect, because of things like the example above. We often experience >>> OOME if we're not careful with job partitioning. What I think Spark needs >>> to evolve to is at least to include a mechanism for application-level hints >>> about task memory requirements. We might work on this and
Re: Spark Memory Bounds
A dash of both. I want to know enough that I can "reason about", rather than "strictly control", the amount of memory Spark will use. If I have a big data set, I want to understand how I can design it so that Spark's memory consumption falls below my available resources. Or alternatively, if it's even possible for Spark to process a data set over a certain size. And if I run into memory problems, I want to know which knobs to turn, and how turning those knobs will affect memory consumption. It's my understanding that between certain key stages in a Spark DAG (i.e. group by stages), Spark will serialize all data structures necessary to continue the computation at the next stage, including closures. So in theory, per machine, Spark only needs to hold the transient memory required to process the partitions assigned to the currently active tasks. Is my understanding correct? Specifically, once a key/value pair is serialized in the shuffle stage of a task, are the references to the raw java objects released before the next task is started. On Tue, May 27, 2014 at 6:21 PM, Christopher Nguyen wrote: > Keith, do you mean "bound" as in (a) strictly control to some quantifiable > limit, or (b) try to minimize the amount used by each task? > > If "a", then that is outside the scope of Spark's memory management, which > you should think of as an application-level (that is, above JVM) mechanism. > In this scope, Spark "voluntarily" tracks and limits the amount of memory > it uses for explicitly known data structures, such as RDDs. What Spark > cannot do is, e.g., control or manage the amount of JVM memory that a given > piece of user code might take up. For example, I might write some closure > code that allocates a large array of doubles unbeknownst to Spark. > > If "b", then your thinking is in the right direction, although quite > imperfect, because of things like the example above. We often experience > OOME if we're not careful with job partitioning. What I think Spark needs > to evolve to is at least to include a mechanism for application-level hints > about task memory requirements. We might work on this and submit a PR for > it. > > -- > Christopher T. Nguyen > Co-founder & CEO, Adatao <http://adatao.com> > linkedin.com/in/ctnguyen > > > > On Tue, May 27, 2014 at 5:33 PM, Keith Simmons wrote: > >> I'm trying to determine how to bound my memory use in a job working with >> more data than can simultaneously fit in RAM. From reading the tuning >> guide, my impression is that Spark's memory usage is roughly the following: >> >> (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory >> used by all currently running tasks >> >> I can bound A with spark.storage.memoryFraction and I can bound B with >> spark.shuffle.memoryFraction. >> I'm wondering how to bound C. >> >> It's been hinted at a few times on this mailing list that you can reduce >> memory use by increasing the number of partitions. That leads me to >> believe that the amount of transient memory is roughly follows: >> >> total_data_set_size/number_of_partitions * >> number_of_tasks_simultaneously_running_per_machine >> >> Does this sound right? In other words, as I increase the number of >> partitions, the size of each partition will decrease, and since each task >> is processing a single partition and there are a bounded number of tasks in >> flight, my memory use has a rough upper limit. >> >> Keith >> > >
Spark Memory Bounds
I'm trying to determine how to bound my memory use in a job working with more data than can simultaneously fit in RAM. From reading the tuning guide, my impression is that Spark's memory usage is roughly the following: (A) In-Memory RDD use + (B) In memory Shuffle use + (C) Transient memory used by all currently running tasks I can bound A with spark.storage.memoryFraction and I can bound B with spark.shuffle.memoryFraction. I'm wondering how to bound C. It's been hinted at a few times on this mailing list that you can reduce memory use by increasing the number of partitions. That leads me to believe that the amount of transient memory is roughly follows: total_data_set_size/number_of_partitions * number_of_tasks_simultaneously_running_per_machine Does this sound right? In other words, as I increase the number of partitions, the size of each partition will decrease, and since each task is processing a single partition and there are a bounded number of tasks in flight, my memory use has a rough upper limit. Keith
Re: TriangleCount & Shortest Path under Spark
The triangle count failed for me when I ran it on more than one node. There was this assertion in TriangleCount.scala: // double count should be even (divisible by two) assert((dblCount & 1) == 0) That did not hold true when I ran this on multiple nodes, even when following the guidelines to make sure that all source ids are greater than destination ids and partitioning the graph using RandomVertexCut. I didn't dig into the code to see why this assertion was failing, but commenting that line out allowed the code to run. I'm not sure how much I can trust the results, but they looked generally right. Not sure if this is the failure you are talking about or not. As far as shortest path, the programming guide had an example that worked well for me under https://spark.incubator.apache.org/docs/latest/graphx-programming-guide.html#pregel-api . Keith On Sun, Mar 9, 2014 at 5:52 PM, yxzhao wrote: > Hi All, > > I have already set up Spark-0.9.0-incubating on our school's cluster. I > successfully run the Spark PageRank demo located in > /spark-0.9.0-incubating/examples/src/main/scala/org/apache/spark/examples. > > Problem 1. I want to run the TriangleCount whose source code located > > in/spark-0.9.0-incubating/graphx/src/main/scala/org/apache/spark/graphx/lib. > I used the following command. > ./bin/run-example org.apache.spark.graphx.GraphOps.TriangleCount > spark://10.1.255.206:7077 > But it did not work. Is there any mistake in my command line? Could > anybody > let me know how to correctly run the TriangleCount demo. Thanks very much. > > Problem 2. Does anybody have shortest path implementation code under > Spark? > If so could you share it with me? > > Thanks in advance, > Regards, > Joey > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/TriangleCount-Shortest-Path-under-Spark-tp2438.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >