Re: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-14 Thread Hao Ren
Thank you, Subash. It works!

On Tue, Aug 13, 2019 at 5:58 AM Subash Prabakar 
wrote:

> I had the similar issue reading the external parquet table . In my case I
> had permission issue in one partition so I added filter to exclude that
> partition but still the spark didn’t prune it. Then I read that in order
> for spark to be aware of all the partitions it first read the folders and
> then updated its metastore . Then the sql is applied on TOP of it. Instead
> of using the existing hive SerDe and this property is only for parquet
> files.
>
> Hive metastore Parquet table conversion
> <https://spark.apache.org/docs/2.3.0/sql-programming-guide.html#hive-metastore-parquet-table-conversion>
>
> When reading from and writing to Hive metastore Parquet tables, Spark SQL
> will try to use its own Parquet support instead of Hive SerDe for better
> performance. This behavior is controlled by the
> spark.sql.hive.convertMetastoreParquetconfiguration, and is turned on by
> default.
>
> Reference:
> https://spark.apache.org/docs/2.3.0/sql-programming-guide.html
>
> Set the above property to false . It should work.
>
> If anyone have better explanation please let me know - I have same
> question. Why only parquet has this problem ?
>
> Thanks
> Subash
>
> On Fri, 9 Aug 2019 at 16:18, Hao Ren  wrote:
>
>> Hi Mich,
>>
>> Thank you for your reply.
>> I need to be more clear about the environment. I am using spark-shell to
>> run the query.
>> Actually, the query works even without core-site, hdfs-site being under
>> $SPARK_HOME/conf.
>> My problem is efficiency. Because all of the partitions was scanned
>> instead of the one in question during the execution of the spark sql query.
>> This is why this simple query takes too much time.
>> I would like to know how to improve this by just reading the specific
>> partition in question.
>>
>> Feel free to ask more questions if I am not clear.
>>
>> Best regards,
>> Hao
>>
>> On Thu, Aug 8, 2019 at 9:05 PM Mich Talebzadeh 
>> wrote:
>>
>>> also need others as well using soft link ls -l
>>>
>>> cd $SPARK_HOME/conf
>>>
>>> hive-site.xml -> ${HIVE_HOME/conf/hive-site.xml
>>> core-site.xml -> ${HADOOP_HOME}/etc/hadoop/core-site.xml
>>> hdfs-site.xml -> ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml
>>>
>>> Dr Mich Talebzadeh
>>>
>>>
>>>
>>> LinkedIn * 
>>> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>>
>>>
>>>
>>> http://talebzadehmich.wordpress.com
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Thu, 8 Aug 2019 at 15:16, Hao Ren  wrote:
>>>
>>>>
>>>>
>>>> -- Forwarded message -
>>>> From: Hao Ren 
>>>> Date: Thu, Aug 8, 2019 at 4:15 PM
>>>> Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive
>>>> table
>>>> To: Gourav Sengupta 
>>>>
>>>>
>>>> Hi Gourva,
>>>>
>>>> I am using enableHiveSupport.
>>>> The table was not created by Spark. The table already exists in Hive.
>>>> All I did is just reading it by using SQL query in Spark.
>>>> FYI, I put hive-site.xml in spark/conf/ directory to make sure that
>>>> Spark can access to Hive.
>>>>
>>>> Hao
>>>>
>>>> On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta <
>>>> gourav.sengu...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Just out of curiosity did you start the SPARK session using
>>>>> enableHiveSupport() ?
>>>>>
>>>>> Or are you creating the table using SPARK?
>>>>>
>>>>>
>>>>> Regards,
>>>>> Gourav
>>>>>
>>>>> On Wed, Aug 7, 2019 at 3:28 PM Hao Ren  wrote:
>>>>>
>>>>>> Hi,
>>>>>> I am using Spark SQL 2.3.3 to read a hive table which is

Re: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-09 Thread Hao Ren
Hi Mich,

Thank you for your reply.
I need to be more clear about the environment. I am using spark-shell to
run the query.
Actually, the query works even without core-site, hdfs-site being under
$SPARK_HOME/conf.
My problem is efficiency. Because all of the partitions was scanned instead
of the one in question during the execution of the spark sql query.
This is why this simple query takes too much time.
I would like to know how to improve this by just reading the specific
partition in question.

Feel free to ask more questions if I am not clear.

Best regards,
Hao

On Thu, Aug 8, 2019 at 9:05 PM Mich Talebzadeh 
wrote:

> also need others as well using soft link ls -l
>
> cd $SPARK_HOME/conf
>
> hive-site.xml -> ${HIVE_HOME/conf/hive-site.xml
> core-site.xml -> ${HADOOP_HOME}/etc/hadoop/core-site.xml
> hdfs-site.xml -> ${HADOOP_HOME}/etc/hadoop/hdfs-site.xml
>
> Dr Mich Talebzadeh
>
>
>
> LinkedIn * 
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>
>
>
> http://talebzadehmich.wordpress.com
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Thu, 8 Aug 2019 at 15:16, Hao Ren  wrote:
>
>>
>>
>> -- Forwarded message -
>> From: Hao Ren 
>> Date: Thu, Aug 8, 2019 at 4:15 PM
>> Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive
>> table
>> To: Gourav Sengupta 
>>
>>
>> Hi Gourva,
>>
>> I am using enableHiveSupport.
>> The table was not created by Spark. The table already exists in Hive. All
>> I did is just reading it by using SQL query in Spark.
>> FYI, I put hive-site.xml in spark/conf/ directory to make sure that Spark
>> can access to Hive.
>>
>> Hao
>>
>> On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta 
>> wrote:
>>
>>> Hi,
>>>
>>> Just out of curiosity did you start the SPARK session using
>>> enableHiveSupport() ?
>>>
>>> Or are you creating the table using SPARK?
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Wed, Aug 7, 2019 at 3:28 PM Hao Ren  wrote:
>>>
>>>> Hi,
>>>> I am using Spark SQL 2.3.3 to read a hive table which is partitioned by
>>>> day, hour, platform, request_status and is_sampled. The underlying data is
>>>> in parquet format on HDFS.
>>>> Here is the SQL query to read just *one partition*.
>>>>
>>>> ```
>>>> spark.sql("""
>>>> SELECT rtb_platform_id, SUM(e_cpm)
>>>> FROM raw_logs.fact_request
>>>> WHERE day = '2019-08-01'
>>>> AND hour = '00'
>>>> AND platform = 'US'
>>>> AND request_status = '3'
>>>> AND is_sampled = 1
>>>> GROUP BY rtb_platform_id
>>>> """).show
>>>> ```
>>>>
>>>> However, from the Spark web UI, the stage description shows:
>>>>
>>>> ```
>>>> Listing leaf files and directories for 201616 paths:
>>>> viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0,
>>>> ...
>>>> ```
>>>>
>>>> It seems the job is reading all of the partitions of the table and the
>>>> job takes too long for just one partition. One workaround is using
>>>> `spark.read.parquet` API to read parquet files directly. Spark has
>>>> partition-awareness for partitioned directories.
>>>>
>>>> But still, I would like to know if there is a way to leverage
>>>> partition-awareness via Hive by using `spark.sql` API?
>>>>
>>>> Any help is highly appreciated!
>>>>
>>>> Thank you.
>>>>
>>>> --
>>>> Hao Ren
>>>>
>>>
>>
>> --
>> Hao Ren
>>
>> Software Engineer in Machine Learning @ Criteo
>>
>> Paris, France
>>
>>
>> --
>> Hao Ren
>>
>> Software Engineer in Machine Learning @ Criteo
>>
>> Paris, France
>>
>

-- 
Hao Ren

Software Engineer in Machine Learning @ Criteo

Paris, France


Fwd: Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-08 Thread Hao Ren
-- Forwarded message -
From: Hao Ren 
Date: Thu, Aug 8, 2019 at 4:15 PM
Subject: Re: Spark SQL reads all leaf directories on a partitioned Hive
table
To: Gourav Sengupta 


Hi Gourva,

I am using enableHiveSupport.
The table was not created by Spark. The table already exists in Hive. All I
did is just reading it by using SQL query in Spark.
FYI, I put hive-site.xml in spark/conf/ directory to make sure that Spark
can access to Hive.

Hao

On Thu, Aug 8, 2019 at 1:24 PM Gourav Sengupta 
wrote:

> Hi,
>
> Just out of curiosity did you start the SPARK session using
> enableHiveSupport() ?
>
> Or are you creating the table using SPARK?
>
>
> Regards,
> Gourav
>
> On Wed, Aug 7, 2019 at 3:28 PM Hao Ren  wrote:
>
>> Hi,
>> I am using Spark SQL 2.3.3 to read a hive table which is partitioned by
>> day, hour, platform, request_status and is_sampled. The underlying data is
>> in parquet format on HDFS.
>> Here is the SQL query to read just *one partition*.
>>
>> ```
>> spark.sql("""
>> SELECT rtb_platform_id, SUM(e_cpm)
>> FROM raw_logs.fact_request
>> WHERE day = '2019-08-01'
>> AND hour = '00'
>> AND platform = 'US'
>> AND request_status = '3'
>> AND is_sampled = 1
>> GROUP BY rtb_platform_id
>> """).show
>> ```
>>
>> However, from the Spark web UI, the stage description shows:
>>
>> ```
>> Listing leaf files and directories for 201616 paths:
>> viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0,
>> ...
>> ```
>>
>> It seems the job is reading all of the partitions of the table and the
>> job takes too long for just one partition. One workaround is using
>> `spark.read.parquet` API to read parquet files directly. Spark has
>> partition-awareness for partitioned directories.
>>
>> But still, I would like to know if there is a way to leverage
>> partition-awareness via Hive by using `spark.sql` API?
>>
>> Any help is highly appreciated!
>>
>> Thank you.
>>
>> --
>> Hao Ren
>>
>

-- 
Hao Ren

Software Engineer in Machine Learning @ Criteo

Paris, France


-- 
Hao Ren

Software Engineer in Machine Learning @ Criteo

Paris, France


Spark SQL reads all leaf directories on a partitioned Hive table

2019-08-07 Thread Hao Ren
Hi,
I am using Spark SQL 2.3.3 to read a hive table which is partitioned by
day, hour, platform, request_status and is_sampled. The underlying data is
in parquet format on HDFS.
Here is the SQL query to read just *one partition*.

```
spark.sql("""
SELECT rtb_platform_id, SUM(e_cpm)
FROM raw_logs.fact_request
WHERE day = '2019-08-01'
AND hour = '00'
AND platform = 'US'
AND request_status = '3'
AND is_sampled = 1
GROUP BY rtb_platform_id
""").show
```

However, from the Spark web UI, the stage description shows:

```
Listing leaf files and directories for 201616 paths:
viewfs://root/user/bilogs/logs/fact_request/day=2018-08-01/hour=11/platform=AS/request_status=0/is_sampled=0,
...
```

It seems the job is reading all of the partitions of the table and the job
takes too long for just one partition. One workaround is using
`spark.read.parquet` API to read parquet files directly. Spark has
partition-awareness for partitioned directories.

But still, I would like to know if there is a way to leverage
partition-awareness via Hive by using `spark.sql` API?

Any help is highly appreciated!

Thank you.

-- 
Hao Ren


[Spark Streaming] map and window operation on DStream only process one batch

2016-11-22 Thread Hao Ren
Spark Streaming v 1.6.2
Kafka v0.10.1

I am reading msgs from Kafka.
What surprised me is the following DStream only process the first batch.

KafkaUtils.createDirectStream[
  String,
  String,
  StringDecoder,
  StringDecoder](streamingContext, kafkaParams, Set(topic))
  .map(_._2)
  .window(Seconds(windowLengthInSec))

Some logs as below are endlessly repeated:

16/11/22 14:20:40 INFO MappedDStream: Slicing from 1479820835000 ms to
147982084 ms (aligned to 1479820835000 ms and 147982084 ms)
16/11/22 14:20:40 INFO JobScheduler: Added jobs for time 147982084 ms

And the action on the DStream is just a rdd count

windowedStream foreachRDD { rdd => rdd.count }

>From the webUI, only the first batch is in status: Processing, the others
are all Queued.
However, if I permute map and window operation, everything is ok.

KafkaUtils.createDirectStream[
  String,
  String,
  StringDecoder,
  StringDecoder](streamingContext, kafkaParams, Set(topic))
  .window(Seconds(windowLengthInSec))
  .map(_._2)

I think the two are equivalent. But they are not.

Furthermore, if I replace my KafkaDStream with a QueueStream, it works for
no matter which order of map and window operation.

I am not sure whether this is related with KafkaDStream or just DStream.

Any help is appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: [SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-08 Thread Hao Ren
Yes, it is.
You can define a udf like that.
Basically, it's a udf Int => Int which is a closure contains a non
serializable object.
The latter should cause Task not serializable exception.

Hao

On Mon, Aug 8, 2016 at 5:08 AM, Muthu Jayakumar <bablo...@gmail.com> wrote:

> Hello Hao Ren,
>
> Doesn't the code...
>
> val add = udf {
>   (a: Int) => a + notSer.value
> }
> Mean UDF function that Int => Int ?
>
> Thanks,
> Muthu
>
> On Sun, Aug 7, 2016 at 2:31 PM, Hao Ren <inv...@gmail.com> wrote:
>
>> I am playing with spark 2.0
>> What I tried to test is:
>>
>> Create a UDF in which there is a non serializable object.
>> What I expected is when this UDF is called during materializing the
>> dataFrame where the UDF is used in "select", an task non serializable
>> exception should be thrown.
>> It depends also which "action" is called on that dataframe.
>>
>> Here is the code for reproducing the pb:
>>
>> 
>> object DataFrameSerDeTest extends App {
>>
>>   class A(val value: Int) // It is not serializable
>>
>>   def run() = {
>> val spark = SparkSession
>>   .builder()
>>   .appName("DataFrameSerDeTest")
>>   .master("local[*]")
>>   .getOrCreate()
>>
>> import org.apache.spark.sql.functions.udf
>> import spark.sqlContext.implicits._
>>
>> val notSer = new A(2)
>> val add = udf {
>>   (a: Int) => a + notSer.value
>> }
>> val df = spark.createDataFrame(Seq(
>>   (1, 2),
>>   (2, 2),
>>   (3, 2),
>>   (4, 2)
>> )).toDF("key", "value")
>>   .select($"key", add($"value").as("added"))
>>
>> df.show() // *It should not work because the udf contains a
>> non-serializable object, but it works*
>>
>> df.filter($"key" === 2).show() // *It does not work as expected
>> (org.apache.spark.SparkException: Task not serializable)*
>>   }
>>
>>   run()
>> }
>> 
>>
>> Also, I tried collect(), count(), first(), limit(). All of them worked
>> without non-serializable exceptions.
>> It seems only filter() throws the exception. (feature or bug ?)
>>
>> Any ideas ? Or I just messed things up ?
>> Any help is highly appreciated.
>>
>> --
>> Hao Ren
>>
>> Data Engineer @ leboncoin
>>
>> Paris, France
>>
>
>


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[SPARK-2.0][SQL] UDF containing non-serializable object does not work as expected

2016-08-07 Thread Hao Ren
I am playing with spark 2.0
What I tried to test is:

Create a UDF in which there is a non serializable object.
What I expected is when this UDF is called during materializing the
dataFrame where the UDF is used in "select", an task non serializable
exception should be thrown.
It depends also which "action" is called on that dataframe.

Here is the code for reproducing the pb:


object DataFrameSerDeTest extends App {

  class A(val value: Int) // It is not serializable

  def run() = {
val spark = SparkSession
  .builder()
  .appName("DataFrameSerDeTest")
  .master("local[*]")
  .getOrCreate()

import org.apache.spark.sql.functions.udf
import spark.sqlContext.implicits._

val notSer = new A(2)
val add = udf {
  (a: Int) => a + notSer.value
}
val df = spark.createDataFrame(Seq(
  (1, 2),
  (2, 2),
  (3, 2),
  (4, 2)
)).toDF("key", "value")
  .select($"key", add($"value").as("added"))

df.show() // *It should not work because the udf contains a
non-serializable object, but it works*

df.filter($"key" === 2).show() // *It does not work as expected
(org.apache.spark.SparkException: Task not serializable)*
  }

  run()
}


Also, I tried collect(), count(), first(), limit(). All of them worked
without non-serializable exceptions.
It seems only filter() throws the exception. (feature or bug ?)

Any ideas ? Or I just messed things up ?
Any help is highly appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[MLlib] Term Frequency in TF-IDF seems incorrect

2016-08-01 Thread Hao Ren
When computing term frequency, we can use either HashTF or CountVectorizer
feature extractors.
However, both of them just use the number of times that a term appears in a
document.
It is not a true frequency. Acutally, it should be divided by the length of
the document.

Is this a wanted feature ?

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: SQLContext and HiveContext parse a query string differently ?

2016-05-13 Thread Hao Ren
Basically, I want to run the following query:

select 'a\'b', case(null as Array)

However, neither HiveContext and SQLContext can execute it without
exception.

I have tried

sql(select 'a\'b', case(null as Array))

and

df.selectExpr("'a\'b'", "case(null as Array)")

Neither of them works.

>From the exceptions, I find the query is parsed differently.



On Fri, May 13, 2016 at 8:00 AM, Yong Zhang <java8...@hotmail.com> wrote:

> Not sure what do you mean? You want to have one exactly query running fine
> in both sqlContext and HiveContext? The query parser are different, why do
> you want to have this feature? Do I understand your question correctly?
>
> Yong
>
> --
> Date: Thu, 12 May 2016 13:09:34 +0200
> Subject: SQLContext and HiveContext parse a query string differently ?
> From: inv...@gmail.com
> To: user@spark.apache.org
>
>
> HI,
>
> I just want to figure out why the two contexts behavior differently even
> on a simple query.
> In a netshell, I have a query in which there is a String containing single
> quote and casting to Array/Map.
> I have tried all the combination of diff type of sql context and query
> call api (sql, df.select, df.selectExpr).
> I can't find one rules all.
>
> Here is the code for reproducing the problem.
>
> -
>
> import org.apache.spark.sql.SQLContext
> import org.apache.spark.sql.hive.HiveContext
> import org.apache.spark.{SparkConf, SparkContext}
>
> object Test extends App {
>
>   val sc  = new SparkContext("local[2]", "test", new SparkConf)
>   val hiveContext = new HiveContext(sc)
>   val sqlContext  = new SQLContext(sc)
>
>   val context = hiveContext
>   //  val context = sqlContext
>
>   import context.implicits._
>
>   val df = Seq((Seq(1, 2), 2)).toDF("a", "b")
>   df.registerTempTable("tbl")
>   df.printSchema()
>
>   // case 1
>   context.sql("select cast(a as array) from tbl").show()
>   // HiveContext => org.apache.spark.sql.AnalysisException: cannot recognize 
> input near 'array' '<' 'string' in primitive type specification; line 1 pos 17
>   // SQLContext => OK
>
>   // case 2
>   context.sql("select 'a\\'b'").show()
>   // HiveContext => OK
>   // SQLContext => failure: ``union'' expected but ErrorToken(unclosed string 
> literal) found
>
>   // case 3
>   df.selectExpr("cast(a as array)").show() // OK with HiveContext and 
> SQLContext
>
>   // case 4
>   df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext => failure: end 
> of input expected
> }
>
> -
>
> Any clarification / workaround is high appreciated.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


SQLContext and HiveContext parse a query string differently ?

2016-05-12 Thread Hao Ren
HI,

I just want to figure out why the two contexts behavior differently even on
a simple query.
In a netshell, I have a query in which there is a String containing single
quote and casting to Array/Map.
I have tried all the combination of diff type of sql context and query call
api (sql, df.select, df.selectExpr).
I can't find one rules all.

Here is the code for reproducing the problem.
-

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object Test extends App {

  val sc  = new SparkContext("local[2]", "test", new SparkConf)
  val hiveContext = new HiveContext(sc)
  val sqlContext  = new SQLContext(sc)

  val context = hiveContext
  //  val context = sqlContext

  import context.implicits._

  val df = Seq((Seq(1, 2), 2)).toDF("a", "b")
  df.registerTempTable("tbl")
  df.printSchema()

  // case 1
  context.sql("select cast(a as array) from tbl").show()
  // HiveContext => org.apache.spark.sql.AnalysisException: cannot
recognize input near 'array' '<' 'string' in primitive type
specification; line 1 pos 17
  // SQLContext => OK

  // case 2
  context.sql("select 'a\\'b'").show()
  // HiveContext => OK
  // SQLContext => failure: ``union'' expected but ErrorToken(unclosed
string literal) found

  // case 3
  df.selectExpr("cast(a as array)").show() // OK with
HiveContext and SQLContext

  // case 4
  df.selectExpr("'a\\'b'").show() // HiveContext, SQLContext =>
failure: end of input expected
}

---------

Any clarification / workaround is high appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: Can not kill driver properly

2016-03-21 Thread Hao Ren
Update:

I am using --supervise flag for fault tolerance.



On Mon, Mar 21, 2016 at 4:16 PM, Hao Ren <inv...@gmail.com> wrote:

> Using spark 1.6.1
> Spark Streaming Jobs are submitted via spark-submit (cluster mode)
>
> I tried to kill drivers via webUI, it does not work. These drivers are
> still running.
> I also tried:
> 1. spark-submit --master  --kill 
> 2. ./bin/spark-class org.apache.spark.deploy.Client kill 
> 
>
> Neither works. The workaround is to ssh to the driver node, then kill -9
> ...
> jsp shows the same classname DriverWrapper, so need to pick carefully...
>
> Any idea why this happens ?
> BTW, my streaming job's batch duration is one hour. So do we need to wait
> for job processing to kill kill driver ?
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Can not kill driver properly

2016-03-21 Thread Hao Ren
Using spark 1.6.1
Spark Streaming Jobs are submitted via spark-submit (cluster mode)

I tried to kill drivers via webUI, it does not work. These drivers are
still running.
I also tried:
1. spark-submit --master  --kill 
2. ./bin/spark-class org.apache.spark.deploy.Client kill 


Neither works. The workaround is to ssh to the driver node, then kill -9 ...
jsp shows the same classname DriverWrapper, so need to pick carefully...

Any idea why this happens ?
BTW, my streaming job's batch duration is one hour. So do we need to wait
for job processing to kill kill driver ?

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[Streaming] textFileStream has no events shown in web UI

2016-03-16 Thread Hao Ren
Just a quick question,

When using textFileStream, I did not see any events via web UI.
Actually, I am uploading files to s3 every 5 seconds,
And the mini-batch duration is 30 seconds.
On web ui,:

 *Input Rate*
Avg: 0.00 events/sec

But the schedule time and processing time are correct, and the output of
the steam is also correct. Not sure why web ui has not detected any events.

Thank you.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: [Streaming] Difference between windowed stream and stream with large batch size?

2016-03-16 Thread Hao Ren
Any ideas ?

Feel free to ask me more details, if my questions are not clear.

Thank you.

On Mon, Mar 7, 2016 at 3:38 PM, Hao Ren <inv...@gmail.com> wrote:

> I want to understand the advantage of using windowed stream.
>
> For example,
>
> Stream 1:
> initial duration = 5 s,
> and then transformed into a stream windowed by (*windowLength = *30s, 
> *slideInterval
> = *30s)
>
> Stream 2:
> Duration = 30 s
>
> Questions:
>
> 1. Is Stream 1 equivalent to Stream 2 on behavior ? Do users observe the
> same result ?
> 2. If yes, what is the advantage of one vs. another ? Performance or
> something else ?
> 3. Is a stream with large batch reasonable , say 30 mins or even an hour ?
>
> Thank you.
>
> --
> Hao Ren
>
> Data Engineer @ leboncoin
>
> Paris, France
>



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[Streaming] Difference between windowed stream and stream with large batch size?

2016-03-07 Thread Hao Ren
I want to understand the advantage of using windowed stream.

For example,

Stream 1:
initial duration = 5 s,
and then transformed into a stream windowed by (*windowLength = *30s,
*slideInterval
= *30s)

Stream 2:
Duration = 30 s

Questions:

1. Is Stream 1 equivalent to Stream 2 on behavior ? Do users observe the
same result ?
2. If yes, what is the advantage of one vs. another ? Performance or
something else ?
3. Is a stream with large batch reasonable , say 30 mins or even an hour ?

Thank you.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Unresolved dep when building project with spark 1.6

2016-02-29 Thread Hao Ren
Hi,

I am upgrading my project to spark 1.6.
It seems that the deps are broken.

Deps used in sbt

val scalaVersion = "2.10"
val sparkVersion  = "1.6.0"
val hadoopVersion = "2.7.1"

// Libraries
val scalaTest = "org.scalatest" %% "scalatest" % "2.2.4" % "test"
val sparkSql  = "org.apache.spark" %% "spark-sql" % sparkVersion
val sparkML   = "org.apache.spark" %% "spark-mllib" % sparkVersion
val hadoopAWS = "org.apache.hadoop" % "hadoop-aws" % hadoopVersion
val scopt = "com.github.scopt" %% "scopt" % "3.3.0"
val jodacvt   = "org.joda" % "joda-convert" % "1.8.1"

Sbt exception:

[warn] ::
[warn] ::  UNRESOLVED DEPENDENCIES ::
[warn] ::
[warn] :: org.fusesource.leveldbjni#leveldbjni-all;1.8:
org.fusesource.leveldbjni#leveldbjni-all;1.8!leveldbjni-all.pom(pom.original)
origin location must be absolute:
file:/home/invkrh/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.pom
[warn] ::
[warn]
[warn] Note: Unresolved dependencies path:
[warn] org.fusesource.leveldbjni:leveldbjni-all:1.8
[warn]  +- org.apache.spark:spark-network-shuffle_2.10:1.6.0
[warn]  +- org.apache.spark:spark-core_2.10:1.6.0
[warn]  +- org.apache.spark:spark-catalyst_2.10:1.6.0
[warn]  +- org.apache.spark:spark-sql_2.10:1.6.0
(/home/invkrh/workspace/scala/confucius/botdet/build.sbt#L14)
[warn]  +- org.apache.spark:spark-mllib_2.10:1.6.0
(/home/invkrh/workspace/scala/confucius/botdet/build.sbt#L14)
[warn]  +- fr.leboncoin:botdet_2.10:0.1
sbt.ResolveException: unresolved dependency:
org.fusesource.leveldbjni#leveldbjni-all;1.8:
org.fusesource.leveldbjni#leveldbjni-all;1.8!leveldbjni-all.pom(pom.original)
origin location must be absolute:
file:/home/invkrh/.m2/repository/org/fusesource/leveldbjni/leveldbjni-all/1.8/leveldbjni-all-1.8.pom

Thank you.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


ClosureCleaner does not work for java code

2015-08-10 Thread Hao Ren

It seems that, in order to serialize the anonymous class `
*test.AbstractTest$1*` (ops), it serialize `*test.AbstractTest*` first,
which should not be serialized.

The difference is on the type of RDD. In java code, JavaRDD is used. I am
wondering whether the ClosureCleaner does not work well with JavaRDD.
According to spark code, JavaRDD uses scala API apparently:

def map[R](f: JFunction[T, R]): JavaRDD[R] =
  new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag)

You can reproduce this issue easily, any help is appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


How to distribute non-serializable object in transform task or broadcast ?

2015-08-07 Thread Hao Ren
Is there any workaround to distribute non-serializable object for RDD
transformation or broadcast variable ?

Say I have an object of class C which is not serializable. Class C is in a
jar package, I have no control on it. Now I need to distribute it either by
rdd transformation or by broadcast.

I tried to subclass the class C with Serializable interface. It works for
serialization, but deserialization does not work, since there are no
parameter-less constructor for the class C and deserialization is broken
with an invalid constructor exception.

I think it's a common use case. Any help is appreciated.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


DataFrame writer removes fields which is null for all rows

2015-07-21 Thread Hao Ren
Consider the following code:

val df = Seq((1, 3), (2, 3)).toDF(key, value).registerTempTable(tbl)

sqlContext.sql(select key, null as value from tbl)
  .write.format(json).mode(SaveMode.Overwrite).save(test)

sqlContext.read.format(json).load(test).printSchema()

It shows:

root
 |-- key: long (nullable = true)

The field `value` is removed from the schema when saving the DF to json
file, since it is null for all rows.
Saving to parquet file is the same. Null fields missed !

It seems that it's a default behavior for DF. But I would like to keep the
null fields for schema consistency.

Are there some options/configs to do for this purpose ?

Thx.

-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


S3 Read / Write makes executors deadlocked

2015-07-16 Thread Hao Ren
Given the following code which just reads from s3, then saves files to s3



val inputFileName: String = s3n://input/file/path
val outputFileName: String = s3n://output/file/path
val conf = new 
SparkConf().setAppName(this.getClass.getName).setMaster(local[4])

val sparkContext = new SparkContext(conf)

// Problems here: executors thread locked
sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName)
// But this one works
sparkContext.textFile(inputFileName).count()


It blocks without showing any exceptions or errors. jstack shows that
all executors are locked. The thread dump is in end of this post.

I am using spark-1.4.0 on my PC which has 4 CPU cores.
There are 21 parquet files in the input directory, 500KB / file.

In addition, if we change the last action to a non IO bounded one, for
example, count(). It works.
It seems that S3 read and write in the same stage makes executors deadlocked.

I encountered the same problem when using DataFrame load/save
operations, jira created:
https://issues.apache.org/jira/browse/SPARK-8869

Executor task launch worker-3 #69 daemon prio=5 os_prio=0
tid=0x7f7bd4036800 nid=0x1296 in Object.wait()
[0x7f7c1099a000]
   java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518)
- *locked* 0xe56745b8 (a
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
at 
org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416)
at 
org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153)
at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
at 
org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
at 
org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
at 
org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown 
Source)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
at 
org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
at 
org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1104)
at 
org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$13.apply(PairRDDFunctions.scala:1095)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Re: S3 Read / Write makes executors deadlocked

2015-07-16 Thread Hao Ren
I have tested on another pc which has 8 CPU cores.
But it hangs when defaultParallelismLevel  4, e.g.
sparkConf.setMaster(local[*])
local[1] ~ local[3] work well.

4 is the mysterious boundary.

It seems that I am not the only one encountered this problem:
https://issues.apache.org/jira/browse/SPARK-8898

Here is Sean's answer for the jira above:
this is a jets3t problem. You will have to manage to update it in your
build or get EC2 + Hadoop 2 to work, which I think can be done. At least,
this is just a subset of EC2 should support Hadoop 2 and/or that the EC2
support should move out of Spark anyway. I don't know there's another
action to take in Spark.

But I just use sbt the get the published spark 1.4, and it does not work on
my local PC, not EC2.
Seriously, I do think something should be done for Spark, because s3
read/write is quite a common use case.

Any help on this issue is highly appreciated.
If you need more info, checkout the jira I created:
https://issues.apache.org/jira/browse/SPARK-8869

On Thu, Jul 16, 2015 at 11:39 AM, Hao Ren inv...@gmail.com wrote:

 Given the following code which just reads from s3, then saves files to s3

 

 val inputFileName: String = s3n://input/file/path
 val outputFileName: String = s3n://output/file/path
 val conf = new 
 SparkConf().setAppName(this.getClass.getName).setMaster(local[4])

 val sparkContext = new SparkContext(conf)

 // Problems here: executors thread locked
 sparkContext.textFile(inputFileName).saveAsTextFile(outputFileName)
 // But this one works
 sparkContext.textFile(inputFileName).count()
 

 It blocks without showing any exceptions or errors. jstack shows that all 
 executors are locked. The thread dump is in end of this post.

 I am using spark-1.4.0 on my PC which has 4 CPU cores.
 There are 21 parquet files in the input directory, 500KB / file.

 In addition, if we change the last action to a non IO bounded one, for 
 example, count(). It works.
 It seems that S3 read and write in the same stage makes executors deadlocked.

 I encountered the same problem when using DataFrame load/save operations, 
 jira created: https://issues.apache.org/jira/browse/SPARK-8869

 Executor task launch worker-3 #69 daemon prio=5 os_prio=0 
 tid=0x7f7bd4036800 nid=0x1296 in Object.wait() [0x7f7c1099a000]
java.lang.Thread.State: WAITING (on object monitor)
   at java.lang.Object.wait(Native Method)
   at 
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.doGetConnection(MultiThreadedHttpConnectionManager.java:518)
   - *locked* 0xe56745b8 (a 
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager$ConnectionPool)
   at 
 org.apache.commons.httpclient.MultiThreadedHttpConnectionManager.getConnectionWithTimeout(MultiThreadedHttpConnectionManager.java:416)
   at 
 org.apache.commons.httpclient.HttpMethodDirector.executeMethod(HttpMethodDirector.java:153)
   at 
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:397)
   at 
 org.apache.commons.httpclient.HttpClient.executeMethod(HttpClient.java:323)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRequest(RestS3Service.java:342)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.performRestHead(RestS3Service.java:718)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectImpl(RestS3Service.java:1599)
   at 
 org.jets3t.service.impl.rest.httpclient.RestS3Service.getObjectDetailsImpl(RestS3Service.java:1535)
   at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1987)
   at org.jets3t.service.S3Service.getObjectDetails(S3Service.java:1332)
   at 
 org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.retrieveMetadata(Jets3tNativeFileSystemStore.java:111)
   at sun.reflect.GeneratedMethodAccessor1.invoke(Unknown Source)
   at 
 sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   at java.lang.reflect.Method.invoke(Method.java:497)
   at 
 org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:186)
   at 
 org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
   at org.apache.hadoop.fs.s3native.$Proxy15.retrieveMetadata(Unknown 
 Source)
   at 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.getFileStatus(NativeS3FileSystem.java:414)
   at org.apache.hadoop.fs.FileSystem.exists(FileSystem.java:1397)
   at 
 org.apache.hadoop.fs.s3native.NativeS3FileSystem.create(NativeS3FileSystem.java:341)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
   at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:798)
   at 
 org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
   at org.apache.spark.SparkHadoopWriter.open(SparkHadoopWriter.scala:90

Re: spark-submit can not resolve spark-hive_2.10

2015-07-15 Thread Hao Ren
Thanks for the reply.

Actually, I don't think excluding spark-hive from spark-submit --packages
is a good idea.

I don't want to recompile spark by assembly for my cluster, every time a
new spark release is out.

I prefer using binary version of spark and then adding some jars for job
execution. e.g. Add spark-hive for HiveContext usage.

FYI, spark-hive is just 1.2MB:
http://mvnrepository.com/artifact/org.apache.spark/spark-hive_2.10/1.4.0

On Wed, Jul 8, 2015 at 2:03 AM, Burak Yavuz brk...@gmail.com wrote:

 spark-hive is excluded when using --packages, because it can be included
 in the spark-assembly by adding -Phive during mvn package or sbt assembly.

 Best,
 Burak

 On Tue, Jul 7, 2015 at 8:06 AM, Hao Ren inv...@gmail.com wrote:

 I want to add spark-hive as a dependence to submit my job, but it seems
 that
 spark-submit can not resolve it.

 $ ./bin/spark-submit \
 → --packages

 org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1
 \
 → --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \
 → --master spark://localhost:7077 \

 Ivy Default Cache set to: /home/invkrh/.ivy2/cache
 The jars for the packages stored in: /home/invkrh/.ivy2/jars
 https://repository.jboss.org/nexus/content/repositories/releases/ added
 as a
 remote repository with the name: repo-1
 :: loading settings :: url =

 jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
 org.apache.spark#spark-hive_2.10 added as a dependency
 org.postgresql#postgresql added as a dependency
 joda-time#joda-time added as a dependency
 :: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
 confs: [default]
 found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache
 found joda-time#joda-time;2.8.1 in central
 :: resolution report :: resolve 139ms :: artifacts dl 3ms
 :: modules in use:
 joda-time#joda-time;2.8.1 from central in [default]
 org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in
 [default]

 -
 |  |modules||
  artifacts   |
 |   conf   | number| search|dwnlded|evicted||
 number|dwnlded|

 -
 |  default |   2   |   0   |   0   |   0   ||   2   |
  0   |

 -
 :: retrieving :: org.apache.spark#spark-submit-parent
 confs: [default]
 0 artifacts copied, 2 already retrieved (0kB/6ms)
 Exception in thread main java.lang.NoClassDefFoundError:
 org/apache/spark/sql/hive/HiveContext
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:348)
 at

 org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633)
 at
 org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
 at
 org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
 at
 org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.lang.ClassNotFoundException:
 org.apache.spark.sql.hive.HiveContext
 at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 ... 7 more
 Using Spark's default log4j profile:
 org/apache/spark/log4j-defaults.properties
 15/07/07 16:57:59 INFO Utils: Shutdown hook called

 Any help is appreciated. Thank you.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


[SPARK-SQL] Window Functions optimization

2015-07-13 Thread Hao Ren
Hi, 

I would like to know: Is there any optimization has been done for window
functions in Spark SQL?

For example.

select key,
max(value1) over(partition by key) as m1,
max(value2) over(partition by key) as m2,
max(value3) over(partition by key) as m3
from table

The query above creates 3 fields based on the same partition rule. 

The question is:
Will spark-sql partition the table 3 times in the same way to get the three
max values ? or just partition once if it finds the partition rule is the
same ?

It would be nice if someone could point out some lines of code on it.

Thank you.
Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Window-Functions-optimization-tp23796.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark-submit can not resolve spark-hive_2.10

2015-07-07 Thread Hao Ren
I want to add spark-hive as a dependence to submit my job, but it seems that
spark-submit can not resolve it.

$ ./bin/spark-submit \
→ --packages
org.apache.spark:spark-hive_2.10:1.4.0,org.postgresql:postgresql:9.3-1103-jdbc3,joda-time:joda-time:2.8.1
\
→ --class fr.leboncoin.etl.jobs.dwh.AdStateTraceDWHTransform \
→ --master spark://localhost:7077 \

Ivy Default Cache set to: /home/invkrh/.ivy2/cache
The jars for the packages stored in: /home/invkrh/.ivy2/jars
https://repository.jboss.org/nexus/content/repositories/releases/ added as a
remote repository with the name: repo-1
:: loading settings :: url =
jar:file:/home/invkrh/workspace/scala/spark/assembly/target/scala-2.10/spark-assembly-1.4.0-SNAPSHOT-hadoop2.2.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-hive_2.10 added as a dependency
org.postgresql#postgresql added as a dependency
joda-time#joda-time added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found org.postgresql#postgresql;9.3-1103-jdbc3 in local-m2-cache
found joda-time#joda-time;2.8.1 in central
:: resolution report :: resolve 139ms :: artifacts dl 3ms
:: modules in use:
joda-time#joda-time;2.8.1 from central in [default]
org.postgresql#postgresql;9.3-1103-jdbc3 from local-m2-cache in 
[default]
-
|  |modules||   artifacts   |
|   conf   | number| search|dwnlded|evicted|| number|dwnlded|
-
|  default |   2   |   0   |   0   |   0   ||   2   |   0   |
-
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
0 artifacts copied, 2 already retrieved (0kB/6ms)
Exception in thread main java.lang.NoClassDefFoundError:
org/apache/spark/sql/hive/HiveContext
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:633)
at 
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException:
org.apache.spark.sql.hive.HiveContext
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties
15/07/07 16:57:59 INFO Utils: Shutdown hook called

Any help is appreciated. Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-submit-can-not-resolve-spark-hive-2-10-tp23695.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SPARK-SQL] Re-use col alias in the select clause to avoid sub query

2015-07-06 Thread Hao Ren
Hi, 

I want to re-use column alias in the select clause to avoid sub query.

For example:

select check(key) as b, abs(b) as abs, value1, value2, ..., value30
from test

The query above does not work, because b is not defined in the test's
schema. In stead, I should change the query to the following:

select check(key) as b, abs(check(key)) as abs, value1, value2, ..., value30
from test

Apparently, check function are called twice. In my use case, the check
function is time-consuming.

The workaround is to use sub-query :

select b, abs(b), value1, value2, ..., value30 as abs
from (
  select check(key) as b, value1, value2, ..., value30 from test
) t

The problem is that I have to repeat the 30 following column twice. Image
the following case which does not work:

select check(key) as b, abs(b) as absv,  tan(absv) as tanv, value1, value2,
..., value30
from test

In order not to call my check function many times,  I need to change the
query to 3 sub-queries, which makes query too long, hard to read.

I am wondering whether we can reuse column alias in an efficient way ?

Thank you








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-SQL-Re-use-col-alias-in-the-select-clause-to-avoid-sub-query-tp23645.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkContext and JavaSparkContext

2015-06-29 Thread Hao Ren
It seems that JavaSparkContext is just a wrapper of scala sparkContext.

In JavaSparkContext, the scala one is used to do all the job.

If I pass the same scala sparkContext to initialize JavaSparkContext, I
still manipulate on the same sparkContext.

Sry for spamming.

Hao

On Mon, Jun 29, 2015 at 11:15 AM, Hao Ren inv...@gmail.com wrote:

 Hi,

 I am working on legacy project using spark java code.

 I have a function which takes sqlContext as an argument, however, I need a
 JavaSparkContext in that function.

 It seems that sqlContext.sparkContext() return a scala sparkContext.

 I did not find any API for casting a scala sparkContext to a java one
 except
 :

 new JavaSparkContext(sqlContext.sparkContext())

 I think it will create a new sparkContext. So there will be mutilple
 sparkContext during run time.

 According to some posts, there are some limitations on this. But I did not
 encounter that.

 Question:

 What is the best way to cast a scala sparkContext to a java one ?
 What problem will multiple sparkContext cause ?

 Thank you. =)

 Hao



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-and-JavaSparkContext-tp23525.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


SparkContext and JavaSparkContext

2015-06-29 Thread Hao Ren
Hi, 

I am working on legacy project using spark java code.

I have a function which takes sqlContext as an argument, however, I need a
JavaSparkContext in that function.

It seems that sqlContext.sparkContext() return a scala sparkContext.

I did not find any API for casting a scala sparkContext to a java one except
:

new JavaSparkContext(sqlContext.sparkContext())

I think it will create a new sparkContext. So there will be mutilple
sparkContext during run time.

According to some posts, there are some limitations on this. But I did not
encounter that.

Question:

What is the best way to cast a scala sparkContext to a java one ?
What problem will multiple sparkContext cause ?

Thank you. =)

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkContext-and-JavaSparkContext-tp23525.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: map vs mapPartitions

2015-06-25 Thread Hao Ren
It's not the number of executors that matters, but the # of the CPU cores
of your cluster.

Each partition will be loaded on a core for computing.

e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
partitions (24 tasks for narrow dependency).
Then all the 24 partitions will be loaded to your cluster in parallel, one
on each core.
You may notice that some tasks will finish more quickly than others. So
divide the RDD into (2~3) x (# of cores) for better pipeline performance.
Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
cores, then first done first served until all 72 tasks are processed.

Back to your origin question, map and mapPartitions are both
transformation, but on different granularity.
map = apply the function on each record in each partition.
mapPartitions = apply the function on each partition.
But the rule is the same, one partition per core.

Hope it helps.
Hao




On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will be
  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I call
 input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete partition
 of
  gbs ?
  Will this function call(IteratorString input) is called only for no of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Fwd: map vs mapPartitions

2015-06-25 Thread Hao Ren
-- Forwarded message --
From: Hao Ren inv...@gmail.com
Date: Thu, Jun 25, 2015 at 7:03 PM
Subject: Re: map vs mapPartitions
To: Shushant Arora shushantaror...@gmail.com


In fact, map and mapPartitions produce RDD of the same type:
MapPartitionsRDD.

Check RDD api source code below:

def map[U: ClassTag](f: T = U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) = iter.map(cleanF))
}

def mapPartitions[U: ClassTag](
f: Iterator[T] = Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
this,
(context: TaskContext, index: Int, iter: Iterator[T]) = cleanedF(iter),
preservesPartitioning)
}

So, even map uses iterator !

For map, `iter.map(cleanF)` means when action is called, the passed
function must be applied to all records in each partition.
For mapPartitions, your function is applied on an iterator. No guarantee on
that all records will be loaded in memory. For example,
If the function just takes the first record, for example:
rdd.mapPartitions(iter = Iterator.single(iter.next)), the iterator is not
traversed.
It really depends on your function. It gives you the control on partition
level. Just that.

The two APIs are for different purposes. The choice depends on your need.

In the given example, your mapPartitions is doing the same thing as map =
rdd.map(_.length). The performance is the same.


On Thu, Jun 25, 2015 at 5:36 PM, Shushant Arora shushantaror...@gmail.com
wrote:

 yes, 1 partition per core and  mapPartitions apply function on each
 partition.

 Question is Does complete partition loads in memory so that function can
 be applied to it or its an iterator and iterator.next() loads next record
 and if yes then how is it efficient than map which also works on 1 record
 at a time.


 Is the only difference is -- only while loop as in below runs per record
 as in map . But code above that will be run once per partition.


 public IterableInteger call(IteratorString input)
 throws Exception {
 ListInteger output = new ArrayListInteger();
 while(input.hasNext()){
 output.add(input.next().length());
  }


 so if I don't have any heavy code above while loop, performance will be
 same as of map function.



 On Thu, Jun 25, 2015 at 6:51 PM, Hao Ren inv...@gmail.com wrote:

 It's not the number of executors that matters, but the # of the CPU cores
 of your cluster.

 Each partition will be loaded on a core for computing.

 e.g. A cluster of 3 nodes has 24 cores, and you divide the RDD in 24
 partitions (24 tasks for narrow dependency).
 Then all the 24 partitions will be loaded to your cluster in parallel,
 one on each core.
 You may notice that some tasks will finish more quickly than others. So
 divide the RDD into (2~3) x (# of cores) for better pipeline performance.
 Say we have 72 partitions in your RDD, then initially 24 tasks run on 24
 cores, then first done first served until all 72 tasks are processed.

 Back to your origin question, map and mapPartitions are both
 transformation, but on different granularity.
 map = apply the function on each record in each partition.
 mapPartitions = apply the function on each partition.
 But the rule is the same, one partition per core.

 Hope it helps.
 Hao




 On Thu, Jun 25, 2015 at 1:28 PM, Shushant Arora 
 shushantaror...@gmail.com wrote:

 say source is HDFS,And file is divided in 10 partitions. so what will be
  input contains.

 public IterableInteger call(IteratorString input)

 say I have 10 executors in job each having single partition.

 will it have some part of partition or complete. And if some when I call
 input.next() - it will fetch rest or how is it handled ?





 On Thu, Jun 25, 2015 at 3:11 PM, Sean Owen so...@cloudera.com wrote:

 No, or at least, it depends on how the source of the partitions was
 implemented.

 On Thu, Jun 25, 2015 at 12:16 PM, Shushant Arora
 shushantaror...@gmail.com wrote:
  Does mapPartitions keep complete partitions in memory of executor as
  iterable.
 
  JavaRDDString rdd = jsc.textFile(path);
  JavaRDDInteger output = rdd.mapPartitions(new
  FlatMapFunctionIteratorString, Integer() {
 
  public IterableInteger call(IteratorString input)
  throws Exception {
  ListInteger output = new ArrayListInteger();
  while(input.hasNext()){
  output.add(input.next().length());
  }
  return output;
  }
 
  });
 
 
  Here does input is present in memory and can contain complete
 partition of
  gbs ?
  Will this function call(IteratorString input) is called only for no
 of
  partitions(say if I have 10 in this example) times. Not no of lines
  times(say 1000) .
 
 
  And whats the use of mapPartitionsWithIndex ?
 
  Thanks
 





 --
 Hao Ren

 Data Engineer @ leboncoin

 Paris, France





-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France



-- 
Hao Ren

Data Engineer @ leboncoin

Paris, France


Big performance difference when joining 3 tables in different order

2015-06-04 Thread Hao Ren
Hi,

I encountered a performance issue when join 3 tables in sparkSQL.

Here is the query:

SELECT g.period, c.categoryName, z.regionName, action, list_id, cnt
FROM t_category c, t_zipcode z, click_meter_site_grouped g
WHERE c.refCategoryID = g.category AND z.regionCode = g.region

I need to pay a lot of attention to the table order in FROM clause, if not, 
some order makes the driver broken, 
some order makes the job extremely slow,
only one order makes the job finished quickly.

For the slow one, I noticed a table is loaded 56 times !!! from its CSV
file.

I would like to know more about join implement in SparkSQL the understand
the issue (auto broadcast, etc).

For ones want to know more about the details, here is the jira:
https://issues.apache.org/jira/browse/SPARK-8102

Any help is welcome. =) Thx

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Big-performance-difference-when-joining-3-tables-in-different-order-tp23150.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-23 Thread Hao Ren
Should I repost this to dev list ?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616p22640.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



why does groupByKey return RDD[(K, Iterable[V])] not RDD[(K, CompactBuffer[V])] ?

2015-04-22 Thread Hao Ren
Hi,

Just a quick question,

Regarding the source code of groupByKey:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala#L453

In the end, it cast CompactBuffer to Iterable. But why ? Any advantage?

Thank you.

Hao.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/why-does-groupByKey-return-RDD-K-Iterable-V-not-RDD-K-CompactBuffer-V-tp22616.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SQL can't not create Hive database

2015-04-09 Thread Hao Ren
Hi,

I am working on the local mode.

The following code 

hiveContext.setConf(hive.metastore.warehouse.dir,
/home/spark/hive/warehouse)
hiveContext.sql(create database if not exists db1)

 throws

15/04/09 13:53:16 ERROR RetryingHMSHandler: MetaException(message:Unable to
create database path file:/user/hive/warehouse/db1.db, failed to create
database db1)

It seems that it uses hdfs path, not the local one specified in hiveContext.

Any idea ? Thank you.

Hao




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SQL-can-t-not-create-Hive-database-tp22435.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: The differentce between SparkSql/DataFram join and Rdd join

2015-04-08 Thread Hao Ren
)
at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236)
at
akka.actor.ActorSystemImpl$TerminationCallbacks.ready(ActorSystem.scala:817)
at
akka.actor.ActorSystemImpl$TerminationCallbacks.ready(ActorSystem.scala:786)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
at scala.concurrent.Await$$anonfun$ready$1.apply(package.scala:86)
at
scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
at scala.concurrent.Await$.ready(package.scala:86)
at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:642)
at akka.actor.ActorSystemImpl.awaitTermination(ActorSystem.scala:643)
at org.apache.spark.deploy.worker.Worker$.main(Worker.scala:531)
at org.apache.spark.deploy.worker.Worker.main(Worker.scala)

VM Thread prio=10 tid=0x7f149407d000 nid=0xe75 runnable

GC task thread#0 (ParallelGC) prio=10 tid=0x7f149401f000 nid=0xe6d
runnable

GC task thread#1 (ParallelGC) prio=10 tid=0x7f1494021000 nid=0xe6e
runnable

GC task thread#2 (ParallelGC) prio=10 tid=0x7f1494022800 nid=0xe6f
runnable

GC task thread#3 (ParallelGC) prio=10 tid=0x7f1494024800 nid=0xe70
runnable

GC task thread#4 (ParallelGC) prio=10 tid=0x7f1494026800 nid=0xe71
runnable

GC task thread#5 (ParallelGC) prio=10 tid=0x7f1494028000 nid=0xe72
runnable

GC task thread#6 (ParallelGC) prio=10 tid=0x7f149402a000 nid=0xe73
runnable

GC task thread#7 (ParallelGC) prio=10 tid=0x7f149402c000 nid=0xe74
runnable

VM Periodic Task Thread prio=10 tid=0x7f14940c2800 nid=0xe7c waiting
on condition

JNI global references: 230


Tell me if anything else is needed.

Thank you.

Hao.


On Tue, Apr 7, 2015 at 8:00 PM, Michael Armbrust mich...@databricks.com
wrote:

 The joins here are totally different implementations, but it is worrisome
 that you are seeing the SQL join hanging.  Can you provide more information
 about the hang?  jstack of the driver and a worker that is processing a
 task would be very useful.

 On Tue, Apr 7, 2015 at 8:33 AM, Hao Ren inv...@gmail.com wrote:

 Hi,

 We have 2 hive tables and want to join one with the other.

 Initially, we ran a sql request on HiveContext. But it did not work. It
 was
 blocked on 30/600 tasks.
 Then we tried to load tables into two DataFrames, we have encountered the
 same problem.
 Finally, it works with RDD.join. What we have done is basically
 transforming
 2 tables into 2 pair RDDs, then calling a join operation. It works great
 in
 about 500 s.

 However, workaround is just a workaround, since we have to transform hive
 tables into RDD. This is really annoying.

 Just wondering whether the underlying code of DF/SQL's join operation is
 the
 same as rdd's, knowing that there is a syntax analysis layer for DF/SQL,
 while RDD's join is straightforward on two pair RDDs.

 SQL request:
 --
 select v1.receipt_id, v1.sku, v1.amount, v1.qty, v2.discount
 from table1 as v1 left join table2 as v2
 on v1.receipt_id = v2.receipt_id
 where v1.sku != 

 DataFrame:

 -
 val rdd1 = ss.hiveContext.table(table1)
 val rdd1Filt = rdd1.filter(rdd1.col(sku) !== )
 val rdd2 = ss.hiveContext.table(table2)
 val rddJoin = rdd1Filt.join(rdd2, rdd1Filt(receipt_id) ===
 rdd2(receipt_id))
 rddJoin.saveAsTable(testJoinTable, SaveMode.Overwrite)

 RDD workaround in this case is a bit cumbersome, for short, we just
 created
 2 RDDs, join, and then apply a new schema on the result RDD. This approach
 works, at least all tasks were finished, while the DF/SQL approach don't.

 Any idea ?




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/The-differentce-between-SparkSql-DataFram-join-and-Rdd-join-tp22407.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





-- 
Hao Ren

{Data, Software} Engineer @ ClaraVista

Paris, France


The differentce between SparkSql/DataFram join and Rdd join

2015-04-07 Thread Hao Ren
Hi,

We have 2 hive tables and want to join one with the other.

Initially, we ran a sql request on HiveContext. But it did not work. It was
blocked on 30/600 tasks.
Then we tried to load tables into two DataFrames, we have encountered the
same problem.
Finally, it works with RDD.join. What we have done is basically transforming
2 tables into 2 pair RDDs, then calling a join operation. It works great in
about 500 s. 

However, workaround is just a workaround, since we have to transform hive
tables into RDD. This is really annoying.

Just wondering whether the underlying code of DF/SQL's join operation is the
same as rdd's, knowing that there is a syntax analysis layer for DF/SQL,
while RDD's join is straightforward on two pair RDDs.

SQL request:
--
select v1.receipt_id, v1.sku, v1.amount, v1.qty, v2.discount
from table1 as v1 left join table2 as v2
on v1.receipt_id = v2.receipt_id
where v1.sku != 

DataFrame:
-
val rdd1 = ss.hiveContext.table(table1)
val rdd1Filt = rdd1.filter(rdd1.col(sku) !== )
val rdd2 = ss.hiveContext.table(table2)
val rddJoin = rdd1Filt.join(rdd2, rdd1Filt(receipt_id) ===
rdd2(receipt_id))
rddJoin.saveAsTable(testJoinTable, SaveMode.Overwrite)

RDD workaround in this case is a bit cumbersome, for short, we just created
2 RDDs, join, and then apply a new schema on the result RDD. This approach
works, at least all tasks were finished, while the DF/SQL approach don't.

Any idea ?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/The-differentce-between-SparkSql-DataFram-join-and-Rdd-join-tp22407.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HiveContext setConf seems not stable

2015-04-02 Thread Hao Ren
Hi,

Jira created: https://issues.apache.org/jira/browse/SPARK-6675

Thank you.


On Wed, Apr 1, 2015 at 7:50 PM, Michael Armbrust mich...@databricks.com
wrote:

 Can you open a JIRA please?

 On Wed, Apr 1, 2015 at 9:38 AM, Hao Ren inv...@gmail.com wrote:

 Hi,

 I find HiveContext.setConf does not work correctly. Here are some code
 snippets showing the problem:

 snippet 1:

 
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.{SparkConf, SparkContext}

 object Main extends App {

   val conf = new SparkConf()
 .setAppName(context-test)
 .setMaster(local[8])
   val sc = new SparkContext(conf)
   val hc = new HiveContext(sc)

   *hc.setConf(spark.sql.shuffle.partitions, 10)*
 *  hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
   hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
   hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach
 println
 }

 

 *Results:*
 (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)
 (spark.sql.shuffle.partitions,10)

 snippet 2:

 
 ...
   *hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
 *  hc.setConf(spark.sql.shuffle.partitions, 10)*
   hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
   hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach
 println
 ...

 

 *Results:*
 (hive.metastore.warehouse.dir,/user/hive/warehouse)
 (spark.sql.shuffle.partitions,10)

 *You can see that I just permuted the two setConf call, then that leads
 to two different Hive configuration.*
 *It seems that HiveContext can not set a new value on
 hive.metastore.warehouse.dir key in one or the first setConf call.*
 *You need another setConf call before changing
 hive.metastore.warehouse.dir. For example, set
 hive.metastore.warehouse.dir twice and the snippet 1*

 snippet 3:

 
 ...
 *  hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
 *  hc.setConf(hive.metastore.warehouse.dir,
 /home/spark/hive/warehouse_test)*
   hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
 ...

 

 *Results:*
 (hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)


 *You can reproduce this if you move to the latest branch-1.3
 (1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)*

 *I have also tested the released 1.3.0 (htag =
 4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.*

 *Please tell me if I am missing something. Any help is highly
 appreciated.*

 Hao

 --
 Hao Ren

 {Data, Software} Engineer @ ClaraVista

 Paris, France





-- 
Hao Ren

{Data, Software} Engineer @ ClaraVista

Paris, France


HiveContext setConf seems not stable

2015-04-01 Thread Hao Ren
Hi,

I find HiveContext.setConf does not work correctly. Here are some code
snippets showing the problem:

snippet 1:

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

object Main extends App {

  val conf = new SparkConf()
.setAppName(context-test)
.setMaster(local[8])
  val sc = new SparkContext(conf)
  val hc = new HiveContext(sc)

  *hc.setConf(spark.sql.shuffle.partitions, 10)*
*  hc.setConf(hive.metastore.warehouse.dir,
/home/spark/hive/warehouse_test)*
  hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
  hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println
}


*Results:*
(hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)
(spark.sql.shuffle.partitions,10)

snippet 2:

...
  *hc.setConf(hive.metastore.warehouse.dir,
/home/spark/hive/warehouse_test)*
*  hc.setConf(spark.sql.shuffle.partitions, 10)*
  hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
  hc.getAllConfs filter(_._1.contains(shuffle.partitions)) foreach println
...


*Results:*
(hive.metastore.warehouse.dir,/user/hive/warehouse)
(spark.sql.shuffle.partitions,10)

*You can see that I just permuted the two setConf call, then that leads to
two different Hive configuration.*
*It seems that HiveContext can not set a new value on
hive.metastore.warehouse.dir key in one or the first setConf call.*
*You need another setConf call before changing
hive.metastore.warehouse.dir. For example, set
hive.metastore.warehouse.dir twice and the snippet 1*

snippet 3:

...
*  hc.setConf(hive.metastore.warehouse.dir,
/home/spark/hive/warehouse_test)*
*  hc.setConf(hive.metastore.warehouse.dir,
/home/spark/hive/warehouse_test)*
  hc.getAllConfs filter(_._1.contains(warehouse.dir)) foreach println
...


*Results:*
(hive.metastore.warehouse.dir,/home/spark/hive/warehouse_test)


*You can reproduce this if you move to the latest branch-1.3
(1.3.1-snapshot, htag = 7d029cb1eb6f1df1bce1a3f5784fb7ce2f981a33)*

*I have also tested the released 1.3.0 (htag =
4aaf48d46d13129f0f9bdafd771dd80fe568a7dc). It has the same problem.*

*Please tell me if I am missing something. Any help is highly appreciated.*

Hao

-- 
Hao Ren

{Data, Software} Engineer @ ClaraVista

Paris, France


Broadcast variable questions

2015-01-21 Thread Hao Ren
Hi,

Spark 1.2.0, standalone, local mode(for test)

Here are several questions on broadcast variable: 

1) Where is the broadcast variable cached on executors ? In memory or On
disk ?

I read somewhere, it was said these variables are stored in spark.local.dir.
But I can find any info in Spark 1.2 document. I encountered a problem with
broadcast variables. I have a loop in which a broadcast variable is created,
after 3 iteration, the used memory increased quickly until the full size,
and Spark is blocked, no error message, no exception, just blocked. I would
like to make sure whether it is caused by too many broadcast variables,
because I did not call unpersist() on each broadcast variable.

2) I find that broadcast variable has destroy() and unpersist() method,
what's the difference between them? If a broadcast variable is destroyed, is
it removed from where it is stored ?

Hao







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-variable-questions-tp21292.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SchemaRDD.sample problem

2014-12-23 Thread Hao Ren
update:

t1 is good. After collecting on t1, I find that all row is ok (is_new = 0)
Just after sampling, there are some rows where is_new = 1 which should have
been filtered by Where clause.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741p20833.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SchemaRDD.sample problem

2014-12-17 Thread Hao Ren
Hi,

I am using SparkSQL on 1.2.1 branch. The problem comes froms the following
4-line code:

*val t1: SchemaRDD = hiveContext hql select * from product where is_new =
0
val tb1: SchemaRDD = t1.sample(withReplacement = false, fraction = 0.05)
tb1.registerTempTable(t1_tmp)
(hiveContext sql select count(*) from t1_tmp where is_new = 1) collect
foreach println*

We know that *t1* contains only rows whose is_new field is zero.
After sampling t1 by taking 5% rows, normally, the sampled table should
always contains only rows where is_new = 0. However, line 4 gives a number
about 5 by chance. That means there are some rows where is_new = 1 in the
sampled table, which is not logically possible.

I am not sure SchemaRDD.sample is doing his work well.

Any idea ?

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SchemaRDD-sample-problem-tp20741.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL 1.2.1-snapshot Left Join problem

2014-12-17 Thread Hao Ren
Hi,

When running SparkSQL branch 1.2.1 on EC2 standalone cluster, the following
query does not work:

create table debug as
select v1.* 
from t1 as v1 left join t2 as v2
on v1.sku = v2.sku
where v2.sku is null

Both t1 and t2 have 200 partitions.
t1 has 10k rows, and t2 has 4k rows.

this query block at:

14/12/17 15:56:54 INFO TaskSetManager: Finished task 133.0 in stage 2.0 (TID
541) in 370 ms on ip-10-79-184-49.ec2.internal (122/200)

Via WebUI, I can see there are 24 tasks running, as the cluster has 24 core.
The other tasks are succeeded. It seems that the 24 tasks are blocked and
won't end.

However, SparkSQL 1.1.0 works fine. There might be some problems with join
on 1.2.1

Any idea?

Hao





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-1-2-1-snapshot-Left-Join-problem-tp20748.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



registerTempTable: Table not found

2014-12-09 Thread Hao Ren
Hi, 

I am using Spark SQL on 1.2.1-snapshot.

Here is problem I encountered. Bacially, I want to save a schemaRDD to
HiveContext

val scm = StructType(
  Seq(
StructField(name, StringType, nullable = false),
StructField(cnt, IntegerType, nullable = false)
  ))

val schRdd = hiveContext.applySchema(ranked, scm)
// ranked above is RDD[Row] whose row contains 2 fields
schRdd.registerTempTable(schRdd)

hiveContext sql select count(name) from schRdd limit 20 // = ok

hiveContext sql create table t as select * from schRdd // = table not
found


A query like select works well and gives the correct answer, but when I
try to save the temple table into Hive Context by createTableAsSelect, it
does not work.

*Caused by: org.apache.hadoop.hive.ql.parse.SemanticException: Line 1:32
Table not found 'schRdd'
at
org.apache.hadoop.hive.ql.parse.SemanticAnalyzer.getMetaData(SemanticAnalyzer.java:1243)*

I thought that was caused by registerTempTable, so I replace it by
saveAsTable. It does not work neither.

*Exception in thread main java.lang.AssertionError: assertion failed: No
plan for CreateTableAsSelect Some(sephcn), schRdd, false, None
 LogicalRDD [name#6,cnt#7], MappedRDD[3] at map at Creation.scala:70

at scala.Predef$.assert(Predef.scala:179)
at
org.apache.spark.sql.catalyst.planning.QueryPlanner.apply(QueryPlanner.scala:59)*

I also checked source code of QueryPlanner:

 def apply(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
val iter = strategies.view.flatMap(_(plan)).toIterator
assert(iter.hasNext, sNo plan for $plan)
iter
  }

The comment shows that there are some works to do with it. 

Any help is appreciated.

Thx.

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: registerTempTable: Table not found

2014-12-09 Thread Hao Ren
Thank you.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/registerTempTable-Table-not-found-tp20592p20594.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



scala.MatchError on SparkSQL when creating ArrayType of StructType

2014-12-05 Thread Hao Ren
Hi, 

I am using SparkSQL on 1.1.0 branch. 

The following code leads to a scala.MatchError 
at
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) 

val scm = StructType(inputRDD.schema.fields.init :+ 
  StructField(list, 
ArrayType( 
  StructType( 
Seq(StructField(date, StringType, nullable = false), 
  StructField(nbPurchase, IntegerType, nullable = false, 
nullable = false)) 

// purchaseRDD is RDD[sql.ROW] whose schema is corresponding to scm. It is
transformed from inputRDD
val schemaRDD = hiveContext.applySchema(purchaseRDD, scm) 
schemaRDD.registerTempTable(t_purchase) 

Here's the stackTrace: 
scala.MatchError: ArrayType(StructType(List(StructField(date,StringType,
true ), StructField(n_reachat,IntegerType, true ))),true) (of class
org.apache.spark.sql.catalyst.types.ArrayType) 
at
org.apache.spark.sql.catalyst.expressions.Cast.cast$lzycompute(Cast.scala:247) 
at
org.apache.spark.sql.catalyst.expressions.Cast.cast(Cast.scala:247) 
at
org.apache.spark.sql.catalyst.expressions.Cast.eval(Cast.scala:263) 
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:84) 
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:66)
 
at
org.apache.spark.sql.catalyst.expressions.InterpretedMutableProjection.apply(Projection.scala:50)
 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:149)
 
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
 
at
org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$1.apply(InsertIntoHiveTable.scala:158)
 
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
at org.apache.spark.scheduler.Task.run(Task.scala:54) 
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:744) 

The strange thing is that nullable of date and nbPurchase field are set to
true while it were false in the code. If I set both to true, it works. But,
in fact, they should not be nullable. 

Here's what I find at Cast.scala:247 on 1.1.0 branch 

  private[this] lazy val cast: Any = Any = dataType match { 
case StringType = castToString 
case BinaryType = castToBinary 
case DecimalType = castToDecimal 
case TimestampType = castToTimestamp 
case BooleanType = castToBoolean 
case ByteType = castToByte 
case ShortType = castToShort 
case IntegerType = castToInt 
case FloatType = castToFloat 
case LongType = castToLong 
case DoubleType = castToDouble 
  } 

Any idea? Thank you. 

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/scala-MatchError-on-SparkSQL-when-creating-ArrayType-of-StructType-tp20459.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: EC2 cluster with SSD ebs

2014-11-24 Thread Hao Ren
Hi,

I found that the ec2 script has been improved a lot.

And the option ebs-vol-type is added to specify ebs type.

However, it seems that the option does not work, the cmd I used is the
following:

$SPARK_HOME/ec2/spark-ec2 -k sparkcv -i spark.pem -m r3.4xlarge -s 3 -t
r3.2xlarge --ebs-vol-type=gp2 --ebs-vol-size=200 --copy-aws-credentials
launch spark-cluster

When checking AWS EC2 console, I find 'standard' as the volume type.

Any idea ?

Thank you. =)

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EC2-cluster-with-SSD-ebs-tp19474p19642.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Why is ALS class serializable ?

2014-11-21 Thread Hao Ren
It makes sense.

Thx. =)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262p19472.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Why is ALS class serializable ?

2014-11-19 Thread Hao Ren
Hi,

When reading through ALS code, I find that:

class ALS private (
private var numUserBlocks: Int,
private var numProductBlocks: Int,
private var rank: Int,
private var iterations: Int,
private var lambda: Double,
private var implicitPrefs: Boolean,
private var alpha: Double,
private var seed: Long = System.nanoTime()
  ) extends *Serializable *with Logging 

and why should ALS extend Serializable ?

if not, there will be an Exception: task is not serializable, ALS is not
serializable. 
I did not find any closure functions in which ALS is referenced.

Any idea ? Thx.

Hao








--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-ALS-class-serializable-tp19262.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Understanding spark operation pipeline and block storage

2014-11-19 Thread Hao Ren
Anyone has idea on this ?

Thx



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p19263.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building Spark with hive does not work

2014-11-18 Thread Hao Ren
nvm, it would be better if correctness of flags could be checked by sbt
during building.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19183.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Building Spark with hive does not work

2014-11-17 Thread Hao Ren
Hi,

I am building spark on the most recent master branch.

I checked this page:
https://github.com/apache/spark/blob/master/docs/sql-programming-guide.md

The cmd *./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly* works
fine. A fat jar is created.

However, when I started the SQL-CLI, I encountered an exception:

Spark assembly has been built with Hive, including Datanucleus jars on
classpath
java.lang.ClassNotFoundException:
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:337)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Failed to load main class
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.
*You need to build Spark with -Phive and -Phive-thriftserver.*
Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

It's suggested to do with -Phive and -Phive-thriftserver, which is actually
what I have done.

Any idea ?

Hao




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Building Spark with hive does not work

2014-11-17 Thread Hao Ren
Sry for spamming,

Just after my previous post, I noticed that the command used is:

./sbt/sbt -Phive -Phive-thirftserver clean assembly/assembly

thriftserver* 

the typo error is the evil. Stupid, me.

I believe I just copy-pasted from somewhere else, but no even checked it,
meanwhile no error msg, such as no such option, is displayed, which makes
me consider the flags are correct.

Sry for the carelessness.

Hao




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Building-Spark-with-hive-does-not-work-tp19072p19087.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Understanding spark operation pipeline and block storage

2014-11-10 Thread Hao Ren

Hey, guys

Feel free to ask for more details if my questions are not clear.

Any insight here ?





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201p18496.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Understanding spark operation pipeline and block storage

2014-11-05 Thread Hao Ren
Hi,

I would like to understand the pipeline of spark's operation(transformation
and action) and some details on block storage.

Let's consider the following code:

val rdd1 = SparkContext.textFile(hdfs://...)
rdd1.map(func1).map(func2).count

For example, we have a file in hdfs about 80Gb, already split in 32 files,
each 2.5Gb.

q1) How many partitions will rdd1 have ? 
rule 1) Maybe 32, since there are 32 split files ? Because, most of the
case, this rule is true if the file is not big in size.
rule 2) Maybe more, I am not sure whether spark's block store can contain a
2.5Gb partition. Is there some parameter specify the block store size ?
AFAIK, hdfs block size is used to read data from hdfs by spark. So there
will be (80Gb/hdfsBlockSize) partitions in rdd1, right ? Usually, the hdfs
block size is 64Mb, then we will have 80g / 64m = 1280 partitions ? Too many
?

Which criterion will it take ? the number of split files or hdfs block size.

q2) Here, func1 and func2 are sequentially added into DAG. What's the
workflow on the partition level ?
option1: Given a partition, func1 and func2 will be applied to each element
in this partition sequentially. After everything is done, we count the # of
line in the partition and send count result to drive. Then, we take the next
partition and do the same thing?
option2: Or else, we apply func1 to all the partitions first, then apply
func2 to all partitions which have applied func1, count # of line in each
partition and send result to driver ?

I have do some tests, it seems that option1 is correct. Can anyone confirm
this ?
So in option 1, we have 1 job count which contains 3 stages: map(func1),
map(func2), count.

q3) What if we run out of memory ?

Suppose we have 12 cores, 15Gb memory in cluster.

Case1 :
For example, the func1 will take one line in file, and create an big object
for each line, then the partition applied func1 will become a large
partition. If we have 12 cores in clusters, that means we may have 12 large
partitions in memory. What if these partitions are much bigger than memory ?
What will happen ? an exception OOM / heap size, etc ?

Case2 : 
Suppose the input is 80 GB, but we force RDD to be repartitioned into 6
partitions which is small than the number of core. Normally, each partition
will be send to a core, then all the input will be in memory. However, we
have 15G memory in Cluster. What will happen ? OOM Exception ? 
Then, could we just split the RDD into more partitions so that 80GB /
#partition *12(which is # of cores)  15Gb(memory size) ? Meanwhile, we can
not split too many, which leads to some overhead on task distribution.

If we read data from hdfs using hdfs block size 64MB as partition size, we
will have a formula like:
64Mb * # of cores  Memory
which in most case is true. Could this explain why we reading hdfs using
block size will not leads to OOM like case 2, even if the data is very big
in size.

Sorry for making this post a bit long. Hope I make myself clear.
Any help on any question will be appreciated.

Thank you.

Hao.













--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Understanding-spark-operation-pipeline-and-block-storage-tp18201.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkSQL: set hive.metastore.warehouse.dir in CLI doesn't work

2014-10-15 Thread Hao Ren
Hi,

The following query in sparkSQL 1.1.0 CLI doesn't work.

*SET hive.metastore.warehouse.dir=/home/spark/hive/warehouse
;

create table test as
select v1.*, v2.card_type, v2.card_upgrade_time_black,
v2.card_upgrade_time_gold
from customer v1 left join customer_loyalty v2
on v1.account_id = v2.account_id
limit 5
;*

StackTrack =

org.apache.hadoop.hive.ql.metadata.HiveException:
MetaException(*message:file:/user/hive/warehouse/test* is not a directory or
unable to create one)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:602)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:559)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog.createTable(HiveMetastoreCatalog.scala:99)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:116)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$$anonfun$apply$1.applyOrElse(HiveMetastoreCatalog.scala:111)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:165)
at
org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:156)
at
org.apache.spark.sql.hive.HiveMetastoreCatalog$CreateTables$.apply(HiveMetastoreCatalog.scala:111)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan$lzycompute(HiveContext.scala:358)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.optimizedPlan(HiveContext.scala:357)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:402)
at
org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:400)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:406)
at
org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:406)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd$lzycompute(HiveContext.scala:360)
at
org.apache.spark.sql.hive.HiveContext$QueryExecution.toRdd(HiveContext.scala:360)
at 
org.apache.spark.sql.SchemaRDDLike$class.$init$(SchemaRDDLike.scala:58)
at org.apache.spark.sql.SchemaRDD.init(SchemaRDD.scala:103)
at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:98)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:58)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:291)
at org.apache.hadoop.hive.cli.CliDriver.processLine(CliDriver.java:413)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:226)
at
org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: MetaException(message:file:/user/hive/warehouse/test is not a
directory or unable to create one)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_core(HiveMetaStore.java:1060)
at
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.create_table_with_environment_context(HiveMetaStore.java:1107)
at sun.reflect.GeneratedMethodAccessor133.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:103)
at com.sun.proxy.$Proxy15.create_table_with_environment_context(Unknown
Source)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:482)
at
org.apache.hadoop.hive.metastore.HiveMetaStoreClient.createTable(HiveMetaStoreClient.java:471)
at sun.reflect.GeneratedMethodAccessor132.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.invoke(RetryingMetaStoreClient.java:89)
at com.sun.proxy.$Proxy16.createTable(Unknown Source)
at org.apache.hadoop.hive.ql.metadata.Hive.createTable(Hive.java:596)
... 30 more

It seems that CLI doesn't take the hive.metastore.warehouse.dir value when
creating table with as select 
If just create the table, like create table t (...), and then load 

Re: SparkSQL: select syntax

2014-10-14 Thread Hao Ren
Update:

This syntax is mainly for avoiding retyping column names.

Let's take the example in my previous post, where *a* is a table of 15
columns, *b* has 5 columns, after a join, I have a table of (15 + 5 - 1(key
in b)) = 19 columns and register the table in sqlContext.

I don't want to actually retype all the 19 columns' name when querying with
select. This feature exists in hive.
But in SparkSql, it gives an exception.

Any ideas ? Thx

Hao



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-select-syntax-tp16299p16364.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SparkSQL: select syntax

2014-10-14 Thread Hao Ren
Thank you, Gen.

I will give hiveContext a try. =)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-select-syntax-tp16299p16368.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org