Does Spark consider the free space of hard drive of the data nodes?

2017-02-08 Thread Benyi Wang
We are trying to add 6 spare servers to our existing cluster. Those machines have more CPU cores, more memory. Unfortunately, 3 of them can only use 2.5” hard drives and total size of each node is about 7TB. The other 3 nodes can only have 3.5” hard drives, but have 48TB each nodes. In addition,

Re: Accessing log for lost executors

2016-12-02 Thread Benyi Wang
Usually your executors were killed by YARN due to exceeding the memory. You can change NodeManager's log to see if your application got killed. or use command "yarn logs -applicationId " to download the logs. On Thu, Dec 1, 2016 at 10:30 PM, Nisrina Luthfiyati < nisrina.luthfiy...@gmail.com>

Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
oname#3805,demovalue_etv_map#3806,demoname_etv_map#3807, > demovalue_old_map#3808,map_type#3809] > > > Thanks > Swapnil > > On Sat, Nov 26, 2016 at 2:32 PM, Benyi Wang <bewang.t...@gmail.com> wrote: > >> Could you post the result of explain `c.explain`? If it is broadca

Re: Dataframe broadcast join hint not working

2016-11-26 Thread Benyi Wang
Could you post the result of explain `c.explain`? If it is broadcast join, you will see it in explain. On Sat, Nov 26, 2016 at 10:51 AM, Swapnil Shinde wrote: > Hello > I am trying a broadcast join on dataframes but it is still doing > SortMergeJoin. I even try

Does DeserializeToObject mean that a Row is deserialized to Java objects?

2016-11-07 Thread Benyi Wang
Below is my test code using Spark 2.0.1. DeserializeToObject doesn’t exist in filter() but in map(). Does it means map() does not Tungsten operation? case class Event(id: Long) val e1 = Seq(Event(1L), Event(2L)).toDSval e2 = Seq(Event(2L), Event(3L)).toDS e1.filter(e=>e.id < 10 && e.id >

Re: classpath conflict with spark internal libraries and the spark shell.

2016-09-09 Thread Benyi Wang
I had a problem when I used "spark.executor.userClassPathFirst" before. I don't remember what the problem is. Alternatively, you can use --driver-class-path and "--conf spark.executor.extraClassPath". Unfortunately you may feel frustrated like me when trying to make it work. Depends on how you

Re: The error to read HDFS custom file in spark.

2016-03-19 Thread Benyi Wang
I would say change class RawDataInputFormat[LW <: LongWritable, RD <: RDRawDataRecord] extends FileInputFormat to class RawDataInputFormat[LongWritable, RDRawDataRecord] extends FileInputFormat ​ On Thu, Mar 17, 2016 at 9:48 AM, Mich Talebzadeh wrote: > Hi Tony, >

Re: How to control the number of files for dynamic partition in Spark SQL?

2016-02-01 Thread Benyi Wang
/ this should use the global > shuffle partition parameter > df eventwkRepartitioned = eventwk.repartition(2) > eventwkRepartitioned.registerTempTable("event_wk_repartitioned") > and use this in your insert statement. > > registering temp table is cheap > > HTH > > > On

How to control the number of files for dynamic partition in Spark SQL?

2016-01-29 Thread Benyi Wang
I want to insert into a partition table using dynamic partition, but I don’t want to have 200 files for a partition because the files will be small for my case. sqlContext.sql( """ |insert overwrite table event |partition(eventDate) |select | user, | detail, | eventDate

Re: How to write a custom window function?

2016-01-28 Thread Benyi Wang
Never mind. GenericUDAFCollectList supports struct in 1.3.0. I modified it and it works in a tricky way. I also found an example HiveWindowFunction. On Thu, Jan 28, 2016 at 12:49 PM, Benyi Wang <bewang.t...@gmail.com> wrote: > I'm trying to implement a WindowFunction like collect_li

How to write a custom window function?

2016-01-28 Thread Benyi Wang
I'm trying to implement a WindowFunction like collect_list, but I have to collect a struct. collect_list works only for primitive type. I think I might modify GenericUDAFCollectList, but haven't tried it yet. I'm wondering if there is an example showing how to write a custom WindowFunction in

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Benyi Wang
- I assume your parquet files are compressed. Gzip or Snappy? - What spark version did you use? It seems at least 1.4. If you use spark-sql and tungsten, you might have better performance. but spark 1.5.2 gave me a wrong result when the data was about 300~400GB, just for a simple

Re: How to merge two large table and remove duplicates?

2016-01-08 Thread Benyi Wang
the number of reducers for joins and >>>>groupbys: Currently in Spark SQL, you need to control the degree of >>>>parallelism post-shuffle using “SET >>>>spark.sql.shuffle.partitions=[num_tasks];”. >>>> >>>> >>>> W

Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Benyi Wang
DataFrame filterFrame1 = sourceFrame.filter(col("col1").contains("xyz"));DataFrame frameToProcess = sourceFrame.except(filterFrame1); except is really expensive. Do you actually want this: sourceFrame.filter(! col("col1").contains("xyz")) ​ On Thu, Dec 10, 2015 at 9:57 AM, unk1102

Re: How to make this Spark 1.5.2 code fast and shuffle less data

2015-12-10 Thread Benyi Wang
I don't understand this: "I have the following method code which I call it from a thread spawn from spark driver. So in this case 2000 threads ..." Why do you call it from a thread? Are you process one partition in one thread? On Thu, Dec 10, 2015 at 11:13 AM, Benyi Wang <bewang.t

Re: How to implement zipWithIndex as a UDF?

2015-10-28 Thread Benyi Wang
ct 23, 2015 at 12:44 PM, Michael Armbrust <mich...@databricks.com> wrote: > The user facing type mapping is documented here: > http://spark.apache.org/docs/latest/sql-programming-guide.html#data-types > > On Fri, Oct 23, 2015 at 12:10 PM, Benyi Wang <bewang.t...@gmail.com> > wrote:

How to implement zipWithIndex as a UDF?

2015-10-23 Thread Benyi Wang
If I have two columns StructType(Seq( StructField("id", LongType), StructField("phones", ArrayType(StringType I want to add index for “phones” before I explode it. Can this be implemented as GenericUDF? I tried DataFrame.explode. It worked for simple types like string, but I could not

MatrixFactorizationModel.save got StackOverflowError

2015-08-13 Thread Benyi Wang
I'm using spark-1.4.1 and compile it against CDH5.3.2. When I use ALS.trainImplicit to build a model, I got this error when rank=40 and iterations=30. It worked for (rank=10, iteration=10) and (rank=20, iteration=20). What was wrong with (rank=40, iterations=30)? 15/08/13 01:16:40 INFO

Re: Spark Maven Build

2015-08-10 Thread Benyi Wang
will resolve transitive dependencies using the default version 2.2.0. On Fri, Aug 7, 2015 at 8:45 PM, Benyi Wang bewang.t...@gmail.com wrote: I'm trying to build spark 1.4.1 against CDH 5.3.2. I created a profile called cdh5.3.2 in spark_parent.pom, made some changes for sql/hive/v0.13.1

Spark Maven Build

2015-08-07 Thread Benyi Wang
I'm trying to build spark 1.4.1 against CDH 5.3.2. I created a profile called cdh5.3.2 in spark_parent.pom, made some changes for sql/hive/v0.13.1, and the build finished successfully. Here is my problem: - If I run `mvn -Pcdh5.3.2,yarn,hive install`, the artifacts are installed into my

Re: Does Spark automatically run different stages concurrently when possible?

2015-01-10 Thread Benyi Wang
You may try to change the schudlingMode to FAIR, the default is FIFO. Take a look at this page https://spark.apache.org/docs/1.1.0/job-scheduling.html#scheduling-within-an-application On Sat, Jan 10, 2015 at 10:24 AM, YaoPau jonrgr...@gmail.com wrote: I'm looking for ways to reduce the

HTTP 500 Error for SparkUI in YARN Cluster mode

2014-12-14 Thread Benyi Wang
I got this error when I click Track URL: ApplicationMaster when I run a spark job in YARN cluster mode. I found this jira https://issues.apache.org/jira/browse/YARN-800, but I could not get this problem fixed. I'm running CDH 5.1.0 with Both HDFS and RM HA enabled. Does anybody has the similar

Custom persist or cache of RDD?

2014-11-10 Thread Benyi Wang
When I have a multi-step process flow like this: A - B - C - D - E - F I need to store B and D's results into parquet files B.saveAsParquetFile D.saveAsParquetFile If I don't cache/persist any step, spark might recompute from A,B,C,D and E if something is wrong in F. Of course, I'd better

Best practice for join

2014-11-04 Thread Benyi Wang
I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did, # build (K,V) from A and B to prepare the join val ja = A.map( r = (K1, Va)) val jb = B.map( r = (K1, Vb)) # join A, B val jab = ja.join(jb) # build (K,V) from the joined result of A and B to prepare joining with C val jc =

Re: Best practice for join

2014-11-04 Thread Benyi Wang
, Benyi Wang bewang.t...@gmail.com wrote: I need to join RDD[A], RDD[B], and RDD[C]. Here is what I did, # build (K,V) from A and B to prepare the join val ja = A.map( r = (K1, Va)) val jb = B.map( r = (K1, Vb)) # join A, B val jab = ja.join(jb) # build (K,V) from the joined result

How to make Spark-sql join using HashJoin

2014-10-06 Thread Benyi Wang
I'm using CDH 5.1.0 with Spark-1.0.0. There is spark-sql-1.0.0 in clouder'a maven repository. After put it into the classpath, I can use spark-sql in my application. One of issue is that I couldn't make the join as a hash join. It gives CartesianProduct when I join two SchemaRDDs as follows:

Spark Language Integrated SQL for join on expression

2014-09-29 Thread Benyi Wang
scala user res19: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == ParquetTableScan [id#0,name#1], (ParquetRelation /user/hive/warehouse/user), None scala order res20: org.apache.spark.sql.SchemaRDD = SchemaRDD[72] at RDD at SchemaRDD.scala:98 == Query

How to set KryoRegistrator class in spark-shell

2014-08-20 Thread Benyi Wang
I want to use opencsv's CSVParser to parse csv lines using a script like below in spark-shell: import au.com.bytecode.opencsv.CSVParser; import com.esotericsoftware.kryo.Kryo import org.apache.spark.serializer.KryoRegistrator import org.apache.hadoop.fs.{Path, FileSystem} class MyKryoRegistrator

Re: How to set KryoRegistrator class in spark-shell

2014-08-20 Thread Benyi Wang
I can do that in my application, but I really want to know how I can do it in spark-shell because I usually prototype in spark-shell before I put the code into an application. On Wed, Aug 20, 2014 at 12:47 PM, Sameer Tilak ssti...@live.com wrote: Hi Wang, Have you tried doing this in your