Left Join Yields Results And Not Results
Hi, I'm using pyspark (1.6.2) to do a little bit of ETL and have noticed a very odd situation. I have two dataframes, base and updated. The "updated" dataframe contains constrained subset of data from "base" that I wish to excluded. Something like this. updated = base.where(base.X = F.lit(1000)) It's more complicated than that, but you get the idea. Later, I do a left join. base.join(updated, 'Core_Column', 'left_outer') This should return all values in base and null where updated doesn't have an equality match. And that's almost true, but here's where it gets strange. base.join(updated, 'Core_Column', 'left_outer').select(base.FieldId, updated.FieldId, 'updated.*').show() |FieldId|FieldId|FieldId|x|y|z |123|123|null|1|2|3 Now I understand why base.FieldId shows 123, but why does updated.FieldId show 123 as well, when the expanded join for 'updated.*' shows null. I can what I want to do by using an RDD, but I was hoping to avoid bypassing tungsten. It almost feels like it's optimizing the field based on the join. But I tested other fields as well and they also came back with values from base. Very odd. Any thoughts? Aaron
Heavy Stage Concentration - Ends With Failure
Hi, I have a cluster with 15 nodes of which 5 are HDFS nodes. I kick off a job that creates some 120 stages. Eventually, the active and pending stages reduce down to a small bottleneck and it never fails... the tasks associated with the 10 (or so) running tasks are always allocated to the same executor on the same host. Sooner or later, it runs out of memory ... or some other resource. It falls over and then they tasks are reallocated to another executor. Why do we see such heavy concentration of tasks onto a single executor when other executors are free? Were the tasks assigned to an executor when the job was decomposed into stages?
S3A Creating Task Per Byte (pyspark / 1.6.1)
I'm using the spark 1.6.1 (hadoop-2.6) and I'm trying to load a file that's in s3. I've done this previously with spark 1.5 with no issue. Attempting to load and count a single file as follows: dataFrame = sqlContext.read.text('s3a://bucket/path-to-file.csv') dataFrame.count() But when it attempts to load, it creates 279K tasks. When I look at the tasks, the # of tasks is identical to the # of bytes in the file. Has anyone seen anything like this or have any ideas why it's getting that granular?
Re: Best way to determine # of workers
I think the SparkListener is about as close as it gets. That way I can start up the instance (aws, open-stack, vmware, etc) and simply wait until the SparkListener indicates that the executors are online before starting. Thanks for the advise. Aaron On Fri, Mar 25, 2016 at 10:54 AM, Jacek Laskowski wrote: > Hi, > > You may want to use SparkListener [1] (as webui) and listens to > SparkListenerExecutorAdded and SparkListenerExecutorRemoved. > > [1] > http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.scheduler.SparkListener > > Pozdrawiam, > Jacek Laskowski > > https://medium.com/@jaceklaskowski/ > Mastering Apache Spark http://bit.ly/mastering-apache-spark > Follow me at https://twitter.com/jaceklaskowski > > > On Thu, Mar 24, 2016 at 3:24 PM, Aaron Jackson wrote: > > Well thats unfortunate, just means I have to scrape the webui for that > > information. As to why, I have a cluster that is being increased in > size to > > accommodate the processing requirements of a large set of jobs. Its > useful > > to know when the new workers have joined the spark cluster. In my > specific > > case, I may be growing the cluster size by a hundred nodes and if I fail > to > > wait for that initialization to complete the job will not have enough > memory > > to run my jobs. > > > > Aaron > > > > On Thu, Mar 24, 2016 at 3:07 AM, Takeshi Yamamuro > > > wrote: > >> > >> Hi, > >> > >> There is no way to get such information from your app. > >> Why do you need that? > >> > >> thanks, > >> maropu > >> > >> On Thu, Mar 24, 2016 at 8:23 AM, Ajaxx wrote: > >>> > >>> I'm building some elasticity into my model and I'd like to know when my > >>> workers have come online. It appears at present that the API only > >>> supports > >>> getting information about applications. Is there a good way to > determine > >>> how many workers are available? > >>> > >>> > >>> > >>> -- > >>> View this message in context: > >>> > http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-determine-of-workers-tp26586.html > >>> Sent from the Apache Spark User List mailing list archive at > Nabble.com. > >>> > >>> - > >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > >>> For additional commands, e-mail: user-h...@spark.apache.org > >>> > >> > >> > >> > >> -- > >> --- > >> Takeshi Yamamuro > > > > >
Re: Best way to determine # of workers
Well thats unfortunate, just means I have to scrape the webui for that information. As to why, I have a cluster that is being increased in size to accommodate the processing requirements of a large set of jobs. Its useful to know when the new workers have joined the spark cluster. In my specific case, I may be growing the cluster size by a hundred nodes and if I fail to wait for that initialization to complete the job will not have enough memory to run my jobs. Aaron On Thu, Mar 24, 2016 at 3:07 AM, Takeshi Yamamuro wrote: > Hi, > > There is no way to get such information from your app. > Why do you need that? > > thanks, > maropu > > On Thu, Mar 24, 2016 at 8:23 AM, Ajaxx wrote: > >> I'm building some elasticity into my model and I'd like to know when my >> workers have come online. It appears at present that the API only >> supports >> getting information about applications. Is there a good way to determine >> how many workers are available? >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-determine-of-workers-tp26586.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> > > > -- > --- > Takeshi Yamamuro >
Re: Passing parameters to spark SQL
Yeah, that's what I thought. In this specific case, I'm porting over some scripts from an existing RDBMS platform. I had been porting them (slowly) to in-code notation with python or scala. However, to expedite my efforts (and presumably theirs since I'm not doing this forever), I went down the SQL path. The problem is the loss of type and the possibility for SQL injection. No biggie, just means that where parameterized queries are in-play, we'll have to write it out in-code rather than in SQL. Thanks, Aaron On Sun, Dec 27, 2015 at 8:06 PM, Michael Armbrust wrote: > The only way to do this for SQL is though the JDBC driver. > > However, you can use literal values without lossy/unsafe string > conversions by using the DataFrame API. For example, to filter: > > import org.apache.spark.sql.functions._ > df.filter($"columnName" === lit(value)) > > On Sun, Dec 27, 2015 at 1:11 PM, Ajaxx wrote: > >> Given a SQLContext (or HiveContext) is it possible to pass in parameters >> to a >> query. There are several reasons why this makes sense, including loss of >> data type during conversion to string, SQL injection, etc. >> >> But currently, it appears that SQLContext.sql() only takes a single >> parameter which is a string. >> >> >> >> -- >> View this message in context: >> http://apache-spark-user-list.1001560.n3.nabble.com/Passing-parameters-to-spark-SQL-tp25806.html >> Sent from the Apache Spark User List mailing list archive at Nabble.com. >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> >> >
Increasing memory usage on batch job (pyspark)
Greetings, I am processing a "batch" of files and have structured an iterative process around them. Each batch is processed by first loading the data with spark-csv, performing some minor transformations and then writing back out as parquet. Absolutely no caching or shuffle should occur with anything in this process. I watch memory utilization on each executor and I notice a steady increase in memory with each iteration that completes. Eventually, we reach the memory limit set for the executor and the process begins to slowly degrade and fail. I'm really unclear about what I am doing that could possibly be causing the executors to hold on to state between iterations. Again, I was careful to make sure there was no caching that occurred. I've done most of my testing to date in python, though I will port it to scala to see if the behavior is potentially isolated to the runtime. Spark: 1.5.2 ~~ Ajaxx