Yarn containers getting killed, error 52, multiple joins
Hi, I have a spark 1.6.2 app (tested previously in 2.0.0 as well). It is requiring a ton of memory (1.5TB) for a small dataset (~500mb). The memory usage seems to jump, when I loop through and inner join to make the dataset 12 times as wide. The app goes down during or after this loop, when I try to run a logistic regression on the generated dataframe. I'm using the scala API (2.10). Dynamic resource allocation is configured. Here are the parameters I'm using. --master yarn-client --queue analyst --executor-cores 5 --executor-memory 40G --driver-memory 30G --conf spark.memory.fraction=0.75 --conf spark.yarn.executor.memoryOverhead=5120 Has anyone seen this or have an idea how to tune it? There is no way it should need so much memory. Thanks, Ian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-containers-getting-killed-error-52-multiple-joins-tp28594.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Running Hive and Spark together with Dynamic Resource Allocation
It seems like the best solution is to set: yarn.nodemanager.aux-services to mapred_shuffle,spark_shuffle -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Hive-and-Spark-together-with-Dynamic-Resource-Allocation-tp27968p27978.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Running Hive and Spark together with Dynamic Resource Allocation
Hi, My team has a cluster running HDP, with Hive and Spark. We setup spark to use dynamic resource allocation, for benefits such as not having to hard code the number of executors and to free resources after using. Everything is running on YARN. The problem is that for Spark 1.5.2 with dynamic resource allocation to function properly we needed to set yarn.nodemanager.aux-services in yarn-site.xml to spark_shuffle, but this breaks hive (1.2.1), since it is looking for auxService:mapreduce_shuffle. Does any one know of a way to configure in order to have both services running smoothly? Thanks, Ian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-Hive-and-Spark-together-with-Dynamic-Resource-Allocation-tp27968.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Extra string added to column name? (withColumn & expr)
Hi, I'm trying to implement a custom one hot encoder, since I want the output to be a specific way, suitable to theano. Basically, it will give a new column for each distinct member of the original features and have it set to 1 if the observation contains the specific member of the distinct feature subset. Something like feature1.distinct1, feature1.distinct2... Here is my attempt, which seems logically sound: for (column <- featuresThatNeedEncoding) { for (j <- df.select(column).distinct().collect().toSeq) { df = df.withColumn(column + "." + j.get(0).toString, expr("CASE WHEN " + column + " = '" + j.get(0).toString + "' THEN " + column + "." + j.get(0).toString + " = '1' ELSE " + column + "." + j.get(0).toString + " = '0' END")) } } And some of the stack trace: Exception in thread "main" org.apache.spark.sql.AnalysisException: Can't extract value from someFeature#295; at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:72) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$4.apply(LogicalPlan.scala:267) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$4.apply(LogicalPlan.scala:266) at scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:111) Any ideas how to resolve this or why there is a #295 after my column name? Thanks, Ian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Extra-string-added-to-column-name-withColumn-expr-tp27560.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Large where clause StackOverflow 1.5.2
I solved this by using a Window partitioned by 'id'. I used lead and lag to create columns, which contained nulls in the places that I needed to delete, in each fold. I then removed those rows with the nulls and my additional columns. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-where-clause-StackOverflow-1-5-2-tp27544p27559.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Large where clause StackOverflow 1.5.2
Hi, I'm trying to implement a folding function in Spark, it takes an input k and a data frame of ids and dates. k=1 will be just the data frame, k=2 will, consist of the min and max date for each id once and the rest twice, k=3 will consist of min and max once, min+1 and max-1, twice and the rest three times, etc. Code in scala, with variable names changed: val acctMinDates = df.groupBy("id").agg(min("thedate")) val acctMaxDates = df.groupBy("id").agg(max("thedate")) val acctDates = acctMinDates.join(acctMaxDates, "id").collect() var filterString = ""; for (i <- 1 to k - 1) { if (i == 1) { for (aDate <- acctDates) { filterString = filterString + "(id = " + aDate(0) + " and thedate > " + aDate(1) + " and thedate < " + aDate(2) + ") or "; } filterString = filterString.substring(0, filterString.size - 4) } df = df.unionAll(df.where(filterString)); } } Code that is being attempted to translate, from pandas/python: df = pd.concat([df.groupby('id').apply(lambda x: pd.concat([x.iloc[i: i + k] for i in range(len(x.index) - k + 1)]))]) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Large-where-clause-StackOverflow-1-5-2-tp27544.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Strange behavior including memory leak and NPE
Hi, I've been fighting with a strange situation today. I'm trying to add two entries for each of the distinct rows of an account, except for the first and last (by date). Here's an example of some of the code. I can't get the subset to continue forward: var acctIdList = X_train.select("m_acct_id").distinct() acctIdList = acctIdList.filter("m_acct_id is not null") for (id <- acctIdList) { println("m_acct_id = " + id.getInt(0)) val subset = X_train.where("m_acct_id in (" + id.getInt(0).toString + ")") } The println's will work, if I remove the subsetting logic from the for loop, and a few iterations of the loop will work with the subsetting logic. I'm thinking this might be because the creations of these dataframes in the for loop are eating up memory too quickly. So I might need a different implementation. This is the logic I'm trying to translate from pandas, if that helps: X_train = pd.concat([X_train.groupby('m_acct_id').apply(lambda x: pd.concat([x.iloc[i: i + k] for i in range(len(x.index) - k + 1)]))]) and here is the top of the stack trace, I tried on Spark 1.5.2 and 1.6.2: 16/07/19 14:39:37 ERROR Executor: Managed memory leak detected; size = 33816576 bytes, TID = 1908 16/07/19 14:39:37 ERROR Executor: Exception in task 1.0 in stage 96.0 (TID 1908) java.lang.NullPointerException at org.apache.spark.sql.DataFrame.(DataFrame.scala:131) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2126) at org.apache.spark.sql.DataFrame.filter(DataFrame.scala:755) at org.apache.spark.sql.DataFrame.where(DataFrame.scala:792) Any advice on how to keep moving, would be much appreciated! Thanks, Ian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Strange-behavior-including-memory-leak-and-NPE-tp27358.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Dense Vectors outputs in feature engineering
or would it be common practice to just retain the original categories in another df? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331p27337.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Re: Dense Vectors outputs in feature engineering
Thanks Disha, that worked out well. Can you point me to an example of how to decode my feature vectors in the dataframe, back into their categories? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331p27336.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
Dense Vectors outputs in feature engineering
Hi, I'm trying to use the StringIndexer and OneHotEncoder, in order to vectorize some of my features. Unfortunately, OneHotEncoder only returns sparse vectors. I can't find a way, much less an efficient one, to convert the columns generated by OneHotEncoder into dense vectors. I need this as I will eventually be doing some deep learning on my data, not something internal to spark. If I were to update OneHotEncoder to have a setDense option, is there much of a chance it would be accepted as a PR? Since the first question seems unlikely, is there a way to change a dataframe, with a sparse vector and index columns into columns, like the pandas get_dummies method: http://queirozf.com/entries/one-hot-encoding-a-feature-on-a-pandas-dataframe-an-example or is there a better way to replicate the get_dummies functionality? Thanks, Ian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dense-Vectors-outputs-in-feature-engineering-tp27331.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
SparkR interaction with R libraries (currently 1.5.2)
Hi, I'm trying to figure out how to work with R libraries in spark, properly. I've googled and done some trial and error. The main error, I've been running into is "cannot coerce class "structure("DataFrame", package = "SparkR")" to a data.frame". I'm wondering if there is a way to use the R dataframe functionality on worker nodes or if there is a way to "hack" the R function in order to make it accept Spark dataframes. Here is an example of what I'm trying to do, with a_df being a spark dataframe: ***DISTRIBUTED*** #0 filter out nulls a_df <- filter(a_df, isNotNull(a_df$Ozone)) #1 make closure treeParty <- function(x) { # Use sparseMatrix function from the Matrix package air.ct <- ctree(Ozone ~ ., data = a_df) } #2 put package in context SparkR:::includePackage(sc, partykit) #3 apply to all partitions partied <- SparkR:::lapplyPartition(a_df, treeParty) **LOCAL*** Here is R code that works with a local dataframe, local_df: local_df <- subset(airquality, !is.na(Ozone)) air.ct <- ctree(Ozone ~ ., data = local_df) Any advice would be greatly appreciated! Thanks, Ian -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/SparkR-interaction-with-R-libraries-currently-1-5-2-tp27107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org