Re: Spark SQL DataFrame: Nullable column and filtering
Dear all, after some fiddling I have arrived at this solution: /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val joinedDF = leftDF.as('left).join(rightDF.as('right), leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) import joinedDF.sqlContext.implicits._ val leftColumns = leftDF.columns .map((cn: String) = $left.$cn) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)) .map((cn: String) = $right.$cn) joinedDF.select( leftColumns ++ rightColumns: _*) } Comments welcome Alternatives I tried: - Not Working: If at least the right alias for rightDF is present, one could try joinedDF.drop(right. + columnname) but his does not work (no column is dropped). Unfortunately, drop does not support arguments of type Column / ColumnNames. *@Michael: Should I create a feature request in Jira for drop supporting Columns?* - Working: Without using aliases via as(...), but using column renaming instead: rightDF.withColumnRenamed( communColumnName, right_ + commoncolumnName) to rename the right dataframe column and then do the join criterion as leftDF(commonColumnName) === rightDF(right_ + commonColumnName) In my opinion not so neat. Opinions? Things I observed: - Column handling does not seem consistent - select() supports alias, while drop( ... ) only supports strings. - DataFrame.apply( ) and DataFrame.col do also not support alias. - Thus the only way to handly ambiguous columnNames is via select at the moment. Can someone please confirm this! - Alias information is not displayed via DataFrame.printSchema. (or at least I did not find a way of how to) Cheers, Martin 2015-07-31 22:51 GMT+02:00 Martin Senne martin.se...@googlemail.com: Dear Michael, dear all, a minimal example is listed below. After some further analysis I could figure out, that the problem is related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use columns of the left and right dataframes when doing the select on the joined table. /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } As the column y of the right table has nullable=false, this is then also transferred to the joined-Table y-Column, as I use rightDF( y ). Thus, I need to use columns of the joined table for the select. *Question now: The joined table has column names x, a, x, y. How do I discard the second x column?* All my approaches failed (assuming here, that joinedDF is the joined DataFrame. - Using joinedDFdrop( x ) discards both x columns. - Using joinedDF(x) does not work as it is ambigious - Also using rightDF.as( aliasname) in order to differentiate the column x (from left DataFrame) with x (from right DataFrame) did not work out, as I found no way as use select( $aliasname.x) really programmatically. Could someone sketch the code? Any help welcome, thanks Martin import org.apache.spark.sql.types.{StructField, StructType} import org.apache.spark.{SparkContext, SparkConf} import org.apache.spark.sql.{DataFrame, SQLContext} object OtherEntities { case class Record( x:Int, a: String) case class Mapping( x: Int, y: Int ) val records = Seq( Record(1, hello), Record(2, bob)) val mappings = Seq( Mapping(2, 5) ) } object MinimalShowcase { /** * Customized left outer join on common column. */ def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: DataFrame, commonColumnName: String): DataFrame = { val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn)) val rightColumns = rightDF.columns.filterNot(cn = cn.equals(commonColumnName)).map(cn = rightDF(cn)) leftDF.join(rightDF, leftDF(commonColumnName) === rightDF(commonColumnName), leftouter) .select(leftColumns ++ rightColumns: _*) } /** * Set, if a column is nullable. * @param df source DataFrame * @param cn is the column name to change * @param nullable is the flag to set, such that the column is either nullable or not */ def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) : DataFrame = { val schema = df.schema val newSchema = StructType(schema.map { case StructField( c, t, _, m) if c.equals(cn) = StructField( c,
No event logs in yarn-cluster mode
Hi, I am trying to configure a history server for application. When I running locally(./run-example SparkPi), the event logs are being created, and I can start history server. But when I am trying ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py file:///opt/hadoop/spark/examples/src/main/python/pi.py I am getting 15/08/01 18:18:50 INFO yarn.Client: client token: N/A diagnostics: N/A ApplicationMaster host: 192.168.56.192 ApplicationMaster RPC port: 0 queue: default start time: 1438445890676 final status: SUCCEEDED tracking URL: http://sp-m1:8088/proxy/application_1438444529840_0009/A user: hadoop 15/08/01 18:18:50 INFO util.Utils: Shutdown hook called 15/08/01 18:18:50 INFO util.Utils: Deleting directory /tmp/spark-185f7b83-cb3b-4134-a10c-452366204f74 So it is succeeded, but there is no event logs for this application. here are my configs spark-defaults.conf spark.masteryarn-cluster spark.eventLog.dir /opt/spark/spark-events spark.eventLog.enabled true spark-env.sh export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=“zk1:2181,zk2:2181” export SPARK_HISTORY_OPTS=-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider -Dspark.history.fs.logDirectory=file:/opt/spark/spark-events -Dspark.history.fs.cleaner.enabled=true Any ideas? Thank you
About memory leak in spark 1.4.1
Hi, all I upgrage spark to 1.4.1, many applications failed... I find the heap memory is not full , but the process of CoarseGrainedExecutorBackend will take more memory than I expect, and it will increase as time goes on, finally more than max limited of the server, the worker will die. Any can help?? Mode??standalone spark.executor.memory 50g 25583 xiaoju20 0 75.5g 55g 28m S 1729.3 88.1 2172:52 java 55g more than 50g I apply
Re: How does the # of tasks affect # of threads?
Hello, I am not an expert with Spark, but the error thrown by spark seems indicate that not enough memory for launching job. By default, Spark allocated 1GB for memory, may be you should increase it ? Best regards Fabrice Le sam. 1 août 2015 à 22:51, Connor Zanin cnnr...@udel.edu a écrit : Hello, I am having an issue when I run a word count job. I have included the source and log files for reference. The job finishes successfully, but about halfway through I get a java.lang.OutOfMemoryError (could not create native thread), and this leads to the loss of the Executor. After some searching I found out this was a problem with the environment and the limit by the OS on how many threads I could spawn. However, I had thought that Spark only maintained a thread pool equal in size to the number of cores available across the nodes (by default), and schedules tasks dynamically as threads become available. The only Spark parameter I change is the number of partitions in my RDD. My question is, how is Spark deciding how many threads to spawn and when? -- Regards, Connor Zanin Computer Science University of Delaware -- Regards, Connor Zanin Computer Science University of Delaware - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: TCP/IP speedup
https://spark-summit.org/2015/events/making-sense-of-spark-performance/ On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus edel...@gmail.com wrote: Hi All! How important would be a significant performance improvement to TCP/IP itself, in terms of overall job performance improvement. Which part would be most significantly accelerated? Would it be HDFS? -- ttfn Simon Edelhaus California 2015
Re: TCP/IP speedup
H 2% huh. -- ttfn Simon Edelhaus California 2015 On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra m...@clearstorydata.com wrote: https://spark-summit.org/2015/events/making-sense-of-spark-performance/ On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus edel...@gmail.com wrote: Hi All! How important would be a significant performance improvement to TCP/IP itself, in terms of overall job performance improvement. Which part would be most significantly accelerated? Would it be HDFS? -- ttfn Simon Edelhaus California 2015
Re: No event logs in yarn-cluster mode
On Sat, Aug 1, 2015 at 9:25 AM, Akmal Abbasov akmal.abba...@icloud.com wrote: When I running locally(./run-example SparkPi), the event logs are being created, and I can start history server. But when I am trying ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py Did you look for the event log on the machine where the Spark driver ran? You're using a file: URL and on yarn-cluster, that is in some random machine in the cluster, not your local machine launching the job. Which is why you should probably write these logs to HDFS.
Re: Does anyone have experience with using Hadoop InputFormats?
Sent from my iPad On 2014-9-24, at 上午8:13, Steve Lewis lordjoe2...@gmail.com wrote: When I experimented with using an InputFormat I had used in Hadoop for a long time in Hadoop I found 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat 2) initialize needs to be called in the constructor 3) The type - mine was extends FileInputFormatText, Text must not be a Hadoop Writable - those are not serializable but extends FileInputFormatStringBuffer, StringBuffer does work - I don't think this is allowed in Hadoop Are these statements correct and if so it seems like most Hadoop InputFormate - certainly the custom ones I create require serious modifications to work - does anyone have samples of use of Hadoop InputFormat Since I am working with problems where a directory with multiple files are processed and some files are many gigabytes in size with multiline complex records an input format is a requirement. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How does the # of tasks affect # of threads?
1. I believe that the default memory (per executor) is 512m (from the documentation) 2. I have increased the memory used by spark on workers in my launch script when submitting the job (--executor-memory 124g) 3. The job completes successfully, it is the road bumps in the middle I am concerned with I would like insight into how Spark handle thread creation On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman fab...@gmail.com wrote: Hello, I am not an expert with Spark, but the error thrown by spark seems indicate that not enough memory for launching job. By default, Spark allocated 1GB for memory, may be you should increase it ? Best regards Fabrice Le sam. 1 août 2015 à 22:51, Connor Zanin cnnr...@udel.edu a écrit : Hello, I am having an issue when I run a word count job. I have included the source and log files for reference. The job finishes successfully, but about halfway through I get a java.lang.OutOfMemoryError (could not create native thread), and this leads to the loss of the Executor. After some searching I found out this was a problem with the environment and the limit by the OS on how many threads I could spawn. However, I had thought that Spark only maintained a thread pool equal in size to the number of cores available across the nodes (by default), and schedules tasks dynamically as threads become available. The only Spark parameter I change is the number of partitions in my RDD. My question is, how is Spark deciding how many threads to spawn and when? -- Regards, Connor Zanin Computer Science University of Delaware -- Regards, Connor Zanin Computer Science University of Delaware - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org -- Regards, Connor Zanin Computer Science University of Delaware
Re: Spark Number of Partitions Recommendations
You should also take into account amount of memory that you plan to use. It's advised not to give too much memory for each executor .. otherwise GC overhead will go up. Btw, why prime numbers? -- Ruslan Dautkhanov On Wed, Jul 29, 2015 at 3:31 AM, ponkin alexey.pon...@ya.ru wrote: Hi Rahul, Where did you see such a recommendation? I personally define partitions with the following formula partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores ) ) where nextPrimeNumberAbove(x) - prime number which is greater than x K - multiplicator to calculate start with 1 and encrease untill join perfomance start to degrade -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
TCP/IP speedup
Hi All! How important would be a significant performance improvement to TCP/IP itself, in terms of overall job performance improvement. Which part would be most significantly accelerated? Would it be HDFS? -- ttfn Simon Edelhaus California 2015
Re: TCP/IP speedup
If your network is bandwidth-bound, you'll see setting jumbo frames (MTU 9000) may increase bandwidth up to ~20%. http://docs.hortonworks.com/HDP2Alpha/index.htm#Hardware_Recommendations_for_Hadoop.htm Enabling Jumbo Frames across the cluster improves bandwidth If Spark workload is not network bandwidth-bound, I can see it'll be a few percent to no improvement. -- Ruslan Dautkhanov On Sat, Aug 1, 2015 at 6:08 PM, Simon Edelhaus edel...@gmail.com wrote: H 2% huh. -- ttfn Simon Edelhaus California 2015 On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra m...@clearstorydata.com wrote: https://spark-summit.org/2015/events/making-sense-of-spark-performance/ On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus edel...@gmail.com wrote: Hi All! How important would be a significant performance improvement to TCP/IP itself, in terms of overall job performance improvement. Which part would be most significantly accelerated? Would it be HDFS? -- ttfn Simon Edelhaus California 2015
Re: flatMap output on disk / flatMap memory overhead
Hi Ocatavian, Just out of curiosity, did you try persisting your RDD in serialized format MEMORY_AND_DISK_SER or MEMORY_ONLY_SER ?? i.e. changing your : rdd.persist(MEMORY_AND_DISK) to rdd.persist(MEMORY_ONLY_SER) Regards On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid iras...@cloudera.com wrote: I agree with Richard. It looks like the issue here is shuffling, and shuffle data is always written to disk, so the issue is definitely not that all the output of flatMap has to be stored in memory. If at all possible, I'd first suggest upgrading to a new version of spark -- even in 1.2, there were big improvements to shuffle with sort based shuffle as the default. On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com wrote: Are you sure it's memory related? What is the disk utilization and IO performance on the workers? The error you posted looks to be related to shuffle trying to obtain block data from another worker node and failing to do so in reasonable amount of time. It may still be memory related, but I'm not sure that other resources are ruled out yet. On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea octavian.ga...@inf.ethz.ch wrote: I was tried using reduceByKey, without success. I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey . However, I got the same error as before, namely the error described here: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html My task is to count the frequencies of pairs of words that occur in a set of documents at least 5 times. I know that this final output is sparse and should comfortably fit in memory. However, the intermediate pairs that are spilled by flatMap might need to be stored on the disk, but I don't understand why the persist option does not work and my job fails. My code: rdd.persist(StorageLevel.MEMORY_AND_DISK) .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type ((word1,word2) , 1) .reduceByKey((a,b) = (a + b).toShort) .filter({case((x,y),count) = count = 5}) My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node. One node I keep for the master, 7 nodes for the workers. my conf: conf.set(spark.cores.max, 128) conf.set(spark.akka.frameSize, 1024) conf.set(spark.executor.memory, 115g) conf.set(spark.shuffle.file.buffer.kb, 1000) my spark-env.sh: ulimit -n 20 SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit -XX:-UseCompressedOops SPARK_DRIVER_MEMORY=129G spark version: 1.1.1 Thank you a lot for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org