Re: Spark SQL DataFrame: Nullable column and filtering

2015-08-01 Thread Martin Senne
Dear all,

after some fiddling I have arrived at this solution:

/**
 * Customized left outer join on common column.
 */
def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF:
DataFrame, commonColumnName: String): DataFrame = {
  val joinedDF = leftDF.as('left).join(rightDF.as('right),
leftDF(commonColumnName) === rightDF(commonColumnName), leftouter)

  import joinedDF.sqlContext.implicits._
  val leftColumns = leftDF.columns
.map((cn: String) = $left.$cn)
  val rightColumns = rightDF.columns.filterNot(cn =
cn.equals(commonColumnName))
.map((cn: String) = $right.$cn)

  joinedDF.select( leftColumns ++ rightColumns: _*)
}

Comments welcome

Alternatives I tried:

   - Not Working: If at least the right alias for rightDF is present, one
   could try

   joinedDF.drop(right. + columnname)

   but his does not work (no column is dropped).
   Unfortunately, drop does not support arguments of type Column /
   ColumnNames. *@Michael: Should I create a feature request in Jira for
   drop supporting Columns?*

   -

   Working: Without using aliases via as(...), but using column
renaming instead:

   rightDF.withColumnRenamed( communColumnName, right_ +
commoncolumnName) to rename the right dataframe column and then do the
join criterion as
   leftDF(commonColumnName) === rightDF(right_ + commonColumnName)

   In my opinion not so neat. Opinions?


Things I observed:

   - Column handling does not seem consistent
  - select() supports alias, while drop( ... ) only supports
  strings.
  - DataFrame.apply(  ) and DataFrame.col do also not support alias.
  - Thus the only way to handly ambiguous columnNames is via select at
  the moment. Can someone please confirm this!
  - Alias information is not displayed via DataFrame.printSchema. (or
   at least I did not find a way of how to)

Cheers,

Martin

2015-07-31 22:51 GMT+02:00 Martin Senne martin.se...@googlemail.com:

 Dear Michael, dear all,

 a minimal example is listed below.

 After some further analysis I could figure out, that the problem is
 related to the *leftOuterJoinWithRemovalOfEqualColumn*-Method, as I use
 columns of the left and right dataframes when doing the select on the
 joined table.

   /**
* Customized left outer join on common column.
*/
   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
 DataFrame, commonColumnName: String): DataFrame = {

 val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
 val rightColumns = rightDF.columns.filterNot(cn = 
 cn.equals(commonColumnName)).map(cn = rightDF(cn))

 leftDF.join(rightDF, leftDF(commonColumnName) === 
 rightDF(commonColumnName), leftouter)
   .select(leftColumns ++ rightColumns: _*)
   }

 As the column y of the right table has nullable=false, this is then also 
 transferred to the joined-Table y-Column, as I use rightDF( y ).

 Thus, I need to use columns of the joined table for the select.

 *Question now: The joined table has column names x, a, x, y. How do I 
 discard the second x column?*

 All my approaches failed (assuming here, that joinedDF is the joined 
 DataFrame.


- Using joinedDFdrop( x ) discards both x columns.
- Using joinedDF(x) does not work as it is ambigious
- Also using rightDF.as( aliasname)  in order to differentiate the
column x (from left DataFrame) with x (from right DataFrame) did not
work out, as I found no way as use select( $aliasname.x) really
programmatically. Could someone sketch the code?

 Any help welcome, thanks


 Martin



 
 import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.{SparkContext, SparkConf}
 import org.apache.spark.sql.{DataFrame, SQLContext}

 object OtherEntities {

   case class Record( x:Int, a: String)
   case class Mapping( x: Int, y: Int )

   val records = Seq( Record(1, hello), Record(2, bob))
   val mappings = Seq( Mapping(2, 5) )
 }

 object MinimalShowcase {

   /**
* Customized left outer join on common column.
*/
   def leftOuterJoinWithRemovalOfEqualColumn(leftDF: DataFrame, rightDF: 
 DataFrame, commonColumnName: String): DataFrame = {

 val leftColumns = leftDF.columns.map((cn: String) = leftDF(cn))
 val rightColumns = rightDF.columns.filterNot(cn = 
 cn.equals(commonColumnName)).map(cn = rightDF(cn))

 leftDF.join(rightDF, leftDF(commonColumnName) === 
 rightDF(commonColumnName), leftouter)
   .select(leftColumns ++ rightColumns: _*)
   }


   /**
* Set, if a column is nullable.
* @param df source DataFrame
* @param cn is the column name to change
* @param nullable is the flag to set, such that the column is either 
 nullable or not
*/
   def setNullableStateOfColumn( df: DataFrame, cn: String, nullable: Boolean) 
 : DataFrame = {

 val schema = df.schema
 val newSchema = StructType(schema.map {
   case StructField( c, t, _, m) if c.equals(cn) = StructField( c, 

No event logs in yarn-cluster mode

2015-08-01 Thread Akmal Abbasov
Hi, I am trying to configure a history server for application. 
When I running locally(./run-example SparkPi), the event logs are being 
created, and I can start history server.
But when I am trying
./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster 
file:///opt/hadoop/spark/examples/src/main/python/pi.py 
file:///opt/hadoop/spark/examples/src/main/python/pi.py
I am getting 
15/08/01 18:18:50 INFO yarn.Client:
 client token: N/A
 diagnostics: N/A
 ApplicationMaster host: 192.168.56.192
 ApplicationMaster RPC port: 0
 queue: default
 start time: 1438445890676
 final status: SUCCEEDED
 tracking URL: http://sp-m1:8088/proxy/application_1438444529840_0009/A
 user: hadoop
15/08/01 18:18:50 INFO util.Utils: Shutdown hook called
15/08/01 18:18:50 INFO util.Utils: Deleting directory 
/tmp/spark-185f7b83-cb3b-4134-a10c-452366204f74
So it is succeeded, but there is no event logs for this application.

here are my configs
spark-defaults.conf
spark.masteryarn-cluster
spark.eventLog.dir  /opt/spark/spark-events
spark.eventLog.enabled  true

spark-env.sh
export HADOOP_CONF_DIR=/opt/hadoop/etc/hadoop
export SPARK_DAEMON_JAVA_OPTS=-Dspark.deploy.recoveryMode=ZOOKEEPER 
-Dspark.deploy.zookeeper.url=“zk1:2181,zk2:2181”
export 
SPARK_HISTORY_OPTS=-Dspark.history.provider=org.apache.spark.deploy.history.FsHistoryProvider
 -Dspark.history.fs.logDirectory=file:/opt/spark/spark-events 
-Dspark.history.fs.cleaner.enabled=true

Any ideas?

Thank you

About memory leak in spark 1.4.1

2015-08-01 Thread Sea
Hi, all
I upgrage spark to 1.4.1, many applications failed... I find the heap memory is 
not full , but the process of CoarseGrainedExecutorBackend will take more 
memory than I expect, and it will increase as time goes on, finally more than 
max limited of the server, the worker will die.


Any can help??


Mode??standalone


spark.executor.memory 50g


25583 xiaoju20   0 75.5g  55g  28m S 1729.3 88.1   2172:52 java


55g more than 50g I apply

Re: How does the # of tasks affect # of threads?

2015-08-01 Thread Fabrice Sznajderman
Hello,

I am not an expert with Spark, but the error thrown by spark seems indicate
that not enough memory for launching job. By default, Spark allocated 1GB
for memory, may be you should increase it ?

Best regards

Fabrice

Le sam. 1 août 2015 à 22:51, Connor Zanin cnnr...@udel.edu a écrit :

 Hello,

 I am having an issue when I run a word count job. I have included the
 source and log files for reference. The job finishes successfully, but
 about halfway through I get a java.lang.OutOfMemoryError (could not create
 native thread), and this leads to the loss of the Executor. After some
 searching I found out this was a problem with the environment and the limit
 by the OS on how many threads I could spawn.

 However, I had thought that Spark only maintained a thread pool equal in
 size to the number of cores available across the nodes (by default), and
 schedules tasks dynamically as threads become available. The only Spark
 parameter I change is the number of partitions in my RDD.

 My question is, how is Spark deciding how many threads to spawn and when?

 --
 Regards,

 Connor Zanin
 Computer Science
 University of Delaware



 --
 Regards,

 Connor Zanin
 Computer Science
 University of Delaware

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


Re: TCP/IP speedup

2015-08-01 Thread Mark Hamstra
https://spark-summit.org/2015/events/making-sense-of-spark-performance/

On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus edel...@gmail.com wrote:

 Hi All!

 How important would be a significant performance improvement to TCP/IP
 itself, in terms of
 overall job performance improvement. Which part would be most
 significantly accelerated?
 Would it be HDFS?

 -- ttfn
 Simon Edelhaus
 California 2015



Re: TCP/IP speedup

2015-08-01 Thread Simon Edelhaus
H

2% huh.


-- ttfn
Simon Edelhaus
California 2015

On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra m...@clearstorydata.com
wrote:

 https://spark-summit.org/2015/events/making-sense-of-spark-performance/

 On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus edel...@gmail.com wrote:

 Hi All!

 How important would be a significant performance improvement to TCP/IP
 itself, in terms of
 overall job performance improvement. Which part would be most
 significantly accelerated?
 Would it be HDFS?

 -- ttfn
 Simon Edelhaus
 California 2015





Re: No event logs in yarn-cluster mode

2015-08-01 Thread Marcelo Vanzin
On Sat, Aug 1, 2015 at 9:25 AM, Akmal Abbasov akmal.abba...@icloud.com
wrote:

 When I running locally(./run-example SparkPi), the event logs are being
 created, and I can start history server.
 But when I am trying
 ./spark-submit --class org.apache.spark.examples.SparkPi --master
 yarn-cluster file:///opt/hadoop/spark/examples/src/main/python/pi.py


Did you look for the event log on the machine where the Spark driver ran?
You're using a file: URL and on yarn-cluster, that is in some random
machine in the cluster, not your local machine launching the job.

Which is why you should probably write these logs to HDFS.


Re: Does anyone have experience with using Hadoop InputFormats?

2015-08-01 Thread Antsy.Rao


Sent from my iPad

On 2014-9-24, at 上午8:13, Steve Lewis lordjoe2...@gmail.com wrote:

  When I experimented with using an InputFormat I had used in Hadoop for a 
 long time in Hadoop I found
 1) it must extend org.apache.hadoop.mapred.FileInputFormat (the deprecated 
 class not org.apache.hadoop.mapreduce.lib.input;FileInputFormat
 2) initialize needs to be called in the constructor
 3) The type - mine was extends FileInputFormatText, Text must not be a 
 Hadoop Writable - those are not serializable but extends 
 FileInputFormatStringBuffer, StringBuffer does work - I don't think this is 
 allowed in Hadoop 
 
 Are these statements correct and if so it seems like most Hadoop InputFormate 
 - certainly the custom ones I create require serious modifications to work - 
 does anyone have samples of use of Hadoop InputFormat 
 
 Since I am working with problems where a directory with multiple files are 
 processed and some files are many gigabytes in size with multiline complex 
 records an input format is a requirement.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How does the # of tasks affect # of threads?

2015-08-01 Thread Connor Zanin
1. I believe that the default memory (per executor) is 512m (from the
documentation)
2. I have increased the memory used by spark on workers in my launch script
when submitting the job
   (--executor-memory 124g)
3. The job completes successfully, it is the road bumps in the middle I
am concerned with

I would like insight into how Spark handle thread creation

On Sat, Aug 1, 2015 at 5:33 PM, Fabrice Sznajderman fab...@gmail.com
wrote:

 Hello,

 I am not an expert with Spark, but the error thrown by spark seems
 indicate that not enough memory for launching job. By default, Spark
 allocated 1GB for memory, may be you should increase it ?

 Best regards

 Fabrice

 Le sam. 1 août 2015 à 22:51, Connor Zanin cnnr...@udel.edu a écrit :

 Hello,

 I am having an issue when I run a word count job. I have included the
 source and log files for reference. The job finishes successfully, but
 about halfway through I get a java.lang.OutOfMemoryError (could not create
 native thread), and this leads to the loss of the Executor. After some
 searching I found out this was a problem with the environment and the limit
 by the OS on how many threads I could spawn.

 However, I had thought that Spark only maintained a thread pool equal in
 size to the number of cores available across the nodes (by default), and
 schedules tasks dynamically as threads become available. The only Spark
 parameter I change is the number of partitions in my RDD.

 My question is, how is Spark deciding how many threads to spawn and when?

 --
 Regards,

 Connor Zanin
 Computer Science
 University of Delaware



 --
 Regards,

 Connor Zanin
 Computer Science
 University of Delaware

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Regards,

Connor Zanin
Computer Science
University of Delaware


Re: Spark Number of Partitions Recommendations

2015-08-01 Thread Ruslan Dautkhanov
You should also take into account amount of memory that you plan to use.
It's advised not to give too much memory for each executor .. otherwise GC
overhead will go up.

Btw, why prime numbers?



-- 
Ruslan Dautkhanov

On Wed, Jul 29, 2015 at 3:31 AM, ponkin alexey.pon...@ya.ru wrote:

 Hi Rahul,

 Where did you see such a recommendation?
 I personally define partitions with the following formula

 partitions = nextPrimeNumberAbove( K*(--num-executors * --executor-cores )
 )

 where
 nextPrimeNumberAbove(x) - prime number which is greater than x
 K - multiplicator  to calculate start with 1 and encrease untill join
 perfomance start to degrade




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Number-of-Partitions-Recommendations-tp24022p24059.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




TCP/IP speedup

2015-08-01 Thread Simon Edelhaus
Hi All!

How important would be a significant performance improvement to TCP/IP
itself, in terms of
overall job performance improvement. Which part would be most significantly
accelerated?
Would it be HDFS?

-- ttfn
Simon Edelhaus
California 2015


Re: TCP/IP speedup

2015-08-01 Thread Ruslan Dautkhanov
If your network is bandwidth-bound, you'll see setting jumbo frames (MTU
9000)
may increase bandwidth up to ~20%.

http://docs.hortonworks.com/HDP2Alpha/index.htm#Hardware_Recommendations_for_Hadoop.htm
Enabling Jumbo Frames across the cluster improves bandwidth

If Spark workload is not network bandwidth-bound, I can see it'll be a few
percent to no improvement.



-- 
Ruslan Dautkhanov

On Sat, Aug 1, 2015 at 6:08 PM, Simon Edelhaus edel...@gmail.com wrote:

 H

 2% huh.


 -- ttfn
 Simon Edelhaus
 California 2015

 On Sat, Aug 1, 2015 at 3:45 PM, Mark Hamstra m...@clearstorydata.com
 wrote:

 https://spark-summit.org/2015/events/making-sense-of-spark-performance/

 On Sat, Aug 1, 2015 at 3:24 PM, Simon Edelhaus edel...@gmail.com wrote:

 Hi All!

 How important would be a significant performance improvement to TCP/IP
 itself, in terms of
 overall job performance improvement. Which part would be most
 significantly accelerated?
 Would it be HDFS?

 -- ttfn
 Simon Edelhaus
 California 2015






Re: flatMap output on disk / flatMap memory overhead

2015-08-01 Thread Puneet Kapoor
Hi Ocatavian,

Just out of curiosity, did you try persisting your RDD in serialized format
MEMORY_AND_DISK_SER  or MEMORY_ONLY_SER ??
i.e. changing your :
rdd.persist(MEMORY_AND_DISK)
to
rdd.persist(MEMORY_ONLY_SER)

Regards

On Wed, Jun 10, 2015 at 7:27 AM, Imran Rashid iras...@cloudera.com wrote:

 I agree with Richard.  It looks like the issue here is shuffling, and
 shuffle data is always written to disk, so the issue is definitely not that
 all the output of flatMap has to be stored in memory.

 If at all possible, I'd first suggest upgrading to a new version of spark
 -- even in 1.2, there were big improvements to shuffle with sort based
 shuffle as the default.

 On Tue, Jun 2, 2015 at 1:09 PM, Richard Marscher rmarsc...@localytics.com
  wrote:

 Are you sure it's memory related? What is the disk utilization and IO
 performance on the workers? The error you posted looks to be related to
 shuffle trying to obtain block data from another worker node and failing to
 do so in reasonable amount of time. It may still be memory related, but I'm
 not sure that other resources are ruled out yet.

 On Tue, Jun 2, 2015 at 5:10 AM, octavian.ganea 
 octavian.ga...@inf.ethz.ch wrote:

 I was tried using reduceByKey, without success.

 I also tried this: rdd.persist(MEMORY_AND_DISK).flatMap(...).reduceByKey
 .
 However, I got the same error as before, namely the error described here:

 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-td23098.html

 My task is to count the frequencies of pairs of words that occur in a
 set of
 documents at least 5 times. I know that this final output is sparse and
 should comfortably fit in memory. However, the intermediate pairs that
 are
 spilled by flatMap might need to be stored on the disk, but I don't
 understand why the persist option does not work and my job fails.

 My code:

 rdd.persist(StorageLevel.MEMORY_AND_DISK)
  .flatMap(x = outputPairsOfWords(x)) // outputs pairs of type
 ((word1,word2) , 1)
 .reduceByKey((a,b) = (a + b).toShort)
 .filter({case((x,y),count) = count = 5})


 My cluster has 8 nodes, each with 129 GB of RAM and 16 cores per node.
 One
 node I keep for the master, 7 nodes for the workers.

 my conf:

 conf.set(spark.cores.max, 128)
 conf.set(spark.akka.frameSize, 1024)
 conf.set(spark.executor.memory, 115g)
 conf.set(spark.shuffle.file.buffer.kb, 1000)

 my spark-env.sh:
  ulimit -n 20
  SPARK_JAVA_OPTS=-Xss1g -Xmx129g -d64 -XX:-UseGCOverheadLimit
 -XX:-UseCompressedOops
  SPARK_DRIVER_MEMORY=129G

 spark version: 1.1.1

 Thank you a lot for your help!



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/flatMap-output-on-disk-flatMap-memory-overhead-tp23098p23108.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org