Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Jerry Lam
Hi Koert,

Thank you for your help! GOT IT!

Best Regards,

Jerry

On Wed, Feb 1, 2017 at 6:24 PM, Koert Kuipers <ko...@tresata.com> wrote:

> you can still use it as Dataset[Set[X]]. all transformations should work
> correctly.
>
> however dataset.schema will show binary type, and dataset.show will show
> bytes (unfortunately).
>
> for example:
>
> scala> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
> setEncoder: [X]=> org.apache.spark.sql.Encoder[Set[X]]
>
> scala> val x = Seq(Set(1,2,3)).toDS
> x: org.apache.spark.sql.Dataset[scala.collection.immutable.Set[Int]] =
> [value: binary]
>
> scala> x.map(_ + 4).collect
> res17: Array[scala.collection.immutable.Set[Int]] = Array(Set(1, 2, 3, 4))
>
> scala> x.show
> ++
> |   value|
> ++
> |[2A 01 03 02 02 0...|
> ++
>
>
> scala> x.schema
> res19: org.apache.spark.sql.types.StructType =
> StructType(StructField(value,BinaryType,true))
>
>
> On Wed, Feb 1, 2017 at 12:03 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Koert,
>>
>> Thanks for the tips. I tried to do that but the column's type is now
>> Binary. Do I get the Set[X] back in the Dataset?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> set is currently not supported. you can use kryo encoder. there is no
>>> other work around that i know of.
>>>
>>> import org.apache.spark.sql.{ Encoder, Encoders }
>>> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>>>
>>> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi guys,
>>>>
>>>> I got an exception like the following, when I tried to implement a user
>>>> defined aggregation function.
>>>>
>>>>  Exception in thread "main" java.lang.UnsupportedOperationException:
>>>> No Encoder found for Set[(scala.Long, scala.Long)]
>>>>
>>>> The Set[(Long, Long)] is a field in the case class which is the output
>>>> type for the aggregation.
>>>>
>>>> Is there a workaround for this?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>
>>>
>>
>


using withWatermark on Dataset

2017-02-01 Thread Jerry Lam
Hi everyone,

Anyone knows how to use withWatermark  on Dataset?

I have tried the following but hit this exception:

dataset org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
cannot be cast to "MyType"

The code looks like the following:

dataset
.withWatermark("timestamp", "5 seconds")
.groupBy("timestamp", "customer_id")
.agg(MyAggregator)
.writeStream

Where dataset has MyType for each row.
Where MyType is:
case class MyTpe(customer_id: Long, timestamp: Timestamp, product_id: Long)

MyAggregator which takes MyType as the input type did some maths on the
product_id and outputs a set of product_ids.

Best Regards,

Jerry


Re: Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-02-01 Thread Jerry Lam
Hi Koert,

Thanks for the tips. I tried to do that but the column's type is now
Binary. Do I get the Set[X] back in the Dataset?

Best Regards,

Jerry


On Tue, Jan 31, 2017 at 8:04 PM, Koert Kuipers <ko...@tresata.com> wrote:

> set is currently not supported. you can use kryo encoder. there is no
> other work around that i know of.
>
> import org.apache.spark.sql.{ Encoder, Encoders }
> implicit def setEncoder[X]: Encoder[Set[X]] = Encoders.kryo[Set[X]]
>
> On Tue, Jan 31, 2017 at 7:33 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I got an exception like the following, when I tried to implement a user
>> defined aggregation function.
>>
>>  Exception in thread "main" java.lang.UnsupportedOperationException: No
>> Encoder found for Set[(scala.Long, scala.Long)]
>>
>> The Set[(Long, Long)] is a field in the case class which is the output
>> type for the aggregation.
>>
>> Is there a workaround for this?
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Dataset Question: No Encoder found for Set[(scala.Long, scala.Long)]

2017-01-31 Thread Jerry Lam
Hi guys,

I got an exception like the following, when I tried to implement a user
defined aggregation function.

 Exception in thread "main" java.lang.UnsupportedOperationException: No
Encoder found for Set[(scala.Long, scala.Long)]

The Set[(Long, Long)] is a field in the case class which is the output type
for the aggregation.

Is there a workaround for this?

Best Regards,

Jerry


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hi Sean,

I agree there is no need for that if the implementation actually assigns
c=1 for all missing ratings but from the current implementation of ALS, I
don't think it is doing that.
The idea is that for missing ratings, they are assigned to c=1 (in the
paper) and they do contribute to the optimization of equation (3).

The lines of code that I'm referring to is:

{code}
if (implicitPrefs) {
  // Extension to the original paper to handle b < 0.
confidence is a function of |b|
  // instead so that it is never negative. c1 is confidence -
1.0.
  val c1 = alpha * math.abs(rating)
  // For rating <= 0, the corresponding preference is 0. So the
term below is only added
  // for rating > 0. Because YtY is already added, we need to
adjust the scaling here.
  if (rating > 0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
  }
} else {
  ls.add(srcFactor, rating)
  numExplicits += 1
}
{code}

Regards,

Jerry


On Mon, Dec 5, 2016 at 3:27 PM, Sean Owen <so...@cloudera.com> wrote:

> That doesn't mean this 0 value is literally included in the input. There's
> no need for that.
>
> On Tue, Dec 6, 2016 at 4:24 AM Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Sean,
>>
>> I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
>> " However, with implicit feedback it would be natural to assign values to
>> all rui variables. If no action was observed rui is set to zero, thus
>> meaning in our examples zero watching time, or zero purchases on record."
>>
>> In the implicit setting, apparently there should have values for all
>> pairs (u, i) instead of just the observed ones according to the paper. This
>> is also true for other implicit feedback papers I read.
>>
>> In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the
>> value for this pair. (x^Ty)^2 + regularization.
>>
>> Do I misunderstand the paper?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen <so...@cloudera.com> wrote:
>>
>> What are you referring to in what paper? implicit input would never
>> materialize 0s for missing values.
>>
>> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam <chiling...@gmail.com> wrote:
>>
>> Hello spark users and developers,
>>
>> I read the paper from Yahoo about CF with implicit feedback and other
>> papers using implicit feedbacks. Their implementation require to set the
>> missing rating with 0. That is for unobserved ratings, the confidence for
>> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
>> matrix.
>>
>> I read the source code of the ALS implementation in spark (version 1.6.x)
>> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
>> in ALS.scala). It could be a mistake or it could be an optimization. Just
>> want to see if anyone steps on this yet.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>


Re: Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hi Sean,

I'm referring to the paper (http://yifanhu.net/PUB/cf.pdf) Section 2:
" However, with implicit feedback it would be natural to assign values to
all rui variables. If no action was observed rui is set to zero, thus
meaning in our examples zero watching time, or zero purchases on record."

In the implicit setting, apparently there should have values for all pairs
(u, i) instead of just the observed ones according to the paper. This is
also true for other implicit feedback papers I read.

In section 4, when r=0, p=0 BUT c=1. Therefore, when we optimize the value
for this pair. (x^Ty)^2 + regularization.

Do I misunderstand the paper?

Best Regards,

Jerry


On Mon, Dec 5, 2016 at 2:43 PM, Sean Owen <so...@cloudera.com> wrote:

> What are you referring to in what paper? implicit input would never
> materialize 0s for missing values.
>
> On Tue, Dec 6, 2016 at 3:42 AM Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hello spark users and developers,
>>
>> I read the paper from Yahoo about CF with implicit feedback and other
>> papers using implicit feedbacks. Their implementation require to set the
>> missing rating with 0. That is for unobserved ratings, the confidence for
>> those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
>> matrix.
>>
>> I read the source code of the ALS implementation in spark (version 1.6.x)
>> for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
>> in ALS.scala). It could be a mistake or it could be an optimization. Just
>> want to see if anyone steps on this yet.
>>
>> Best Regards,
>>
>> Jerry
>>
>


Collaborative Filtering Implicit Feedback Impl.

2016-12-05 Thread Jerry Lam
Hello spark users and developers,

I read the paper from Yahoo about CF with implicit feedback and other
papers using implicit feedbacks. Their implementation require to set the
missing rating with 0. That is for unobserved ratings, the confidence for
those is set to 1 (c=1). Therefore, the matrix to be factorized is a dense
matrix.

I read the source code of the ALS implementation in spark (version 1.6.x)
for implicit feedback. Apparently, it ignores rating that is 0 (Line 1159
in ALS.scala). It could be a mistake or it could be an optimization. Just
want to see if anyone steps on this yet.

Best Regards,

Jerry


[Spark SQL]: UDF with Array[Double] as input

2016-04-01 Thread Jerry Lam
Hi spark users and developers,

Anyone tried to pass in an Array[Double] as a input to the UDF? I tried it
for many hours reading spark sql code but IK still couldn't figure out a
way to do this.

Best Regards,

Jerry


Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi guys,

Another point is that if this is unsupported shouldn't it throw an
exception instead of giving the wrong answer? I mean if
d1.join(d2, "id").select(d2("label")) should not work at all, the proper
behaviour is to throw the analysis exception. It now returns a wrong answer
though.

As I said, this is just a tip of iceberg. I have experience worsen than
this. For example, you might think renaming fields will work but in some
cases, it still returns wrong results.

Best Regards,

Jerry

On Tue, Mar 29, 2016 at 7:38 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Divya,
>
> This is not a self-join. d1 and d2 contain totally different rows. They
> are derived from the same table. The transformation that are applied to
> generate d1 and d2 should be able to disambiguate the labels in the
> question.
>
>
> Best Regards,
>
> Jerry
>
>
> On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot <divya.htco...@gmail.com>
> wrote:
>
>>
>> def join(right: DataFrame, joinExprs: Column, joinType: String):
>> DataFrame = {
>> // Note that in this function, we introduce a hack in the case of
>> self-join to automatically
>> // resolve ambiguous join conditions into ones that might make sense
>> [SPARK-6231].
>> // Consider this case: df.join(df, df("key") === df("key"))
>> // Since df("key") === df("key") is a trivially true condition, this
>> actually becomes a
>> // cartesian join. However, most likely users expect to perform a self
>> join using "key".
>> // With that assumption, this hack turns the trivially true condition
>> into equality on join
>> // keys that are resolved to both sides.
>> // Trigger analysis so in the case of self-join, the analyzer will clone
>> the plan.
>> // After the cloning, left and right side will have distinct expression
>> ids.
>>
>> On 29 March 2016 at 14:33, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> I have another example to illustrate the issue. I think the problem is
>>> pretty nasty.
>>>
>>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>>> to 99)).toDF("id", "label")
>>> val d1 = base.where($"label" < 60)
>>> val d2 = base.where($"label" === 60)
>>> d1.join(d2, "id").show
>>> +---+-+-+
>>> | id|label|label|
>>> +---+-+-+
>>> | 40|   40|   60|
>>> +---+-+-+
>>>
>>> d1.join(d2, "id").select(d1("label")).show
>>> +-+
>>> |label|
>>> +-+
>>> |   40|
>>> +-+
>>> (expected answer: 40, right!)
>>>
>>> d1.join(d2, "id").select(d2("label")).show
>>> +-+
>>> |label|
>>> +-+
>>> |   40|
>>> +-+
>>> (expected answer: 60, wrong!)
>>>
>>> d1.join(d2, "id").select(d2("label")).explain
>>> == Physical Plan ==
>>> TungstenProject [label#15]
>>>  SortMergeJoin [id#14], [id#30]
>>>   TungstenSort [id#14 ASC], false, 0
>>>TungstenExchange hashpartitioning(id#14)
>>> TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>>  Filter (_2#13 < 60)
>>>   Scan PhysicalRDD[_1#12,_2#13]
>>>   TungstenSort [id#30 ASC], false, 0
>>>TungstenExchange hashpartitioning(id#30)
>>> TungstenProject [_1#12 AS id#30]
>>>  Filter (_2#13 = 60)
>>>   Scan PhysicalRDD[_1#12,_2#13]
>>>
>>> Again, this is just a tip of the iceberg. I have spent hours to find out
>>> this weird behaviour.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Sunitha,
>>>>
>>>> Thank you for the reference Jira. It looks like this is the bug I'm
>>>> hitting. Most of the bugs related to this seems to associate with
>>>> dataframes derived from the one dataframe (base in this case). In SQL, this
>>>> is a self-join and dropping d2.label should not affect d1.label. There are
>>>> other bugs I found these three days that are associated with this type of
>>>> joins. In one case, if I don't drop the duplicate column BEFORE the join,
>>>> spark has preferences on the columns from d2 da

Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi Divya,

This is not a self-join. d1 and d2 contain totally different rows. They are
derived from the same table. The transformation that are applied to
generate d1 and d2 should be able to disambiguate the labels in the
question.


Best Regards,

Jerry


On Tue, Mar 29, 2016 at 2:43 AM, Divya Gehlot <divya.htco...@gmail.com>
wrote:

>
> def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame
> = {
> // Note that in this function, we introduce a hack in the case of
> self-join to automatically
> // resolve ambiguous join conditions into ones that might make sense
> [SPARK-6231].
> // Consider this case: df.join(df, df("key") === df("key"))
> // Since df("key") === df("key") is a trivially true condition, this
> actually becomes a
> // cartesian join. However, most likely users expect to perform a self
> join using "key".
> // With that assumption, this hack turns the trivially true condition into
> equality on join
> // keys that are resolved to both sides.
> // Trigger analysis so in the case of self-join, the analyzer will clone
> the plan.
> // After the cloning, left and right side will have distinct expression
> ids.
>
> On 29 March 2016 at 14:33, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi guys,
>>
>> I have another example to illustrate the issue. I think the problem is
>> pretty nasty.
>>
>> val base = sc.parallelize(( 0 to 49).zip( 0 to 49) ++ (30 to 79).zip(50
>> to 99)).toDF("id", "label")
>> val d1 = base.where($"label" < 60)
>> val d2 = base.where($"label" === 60)
>> d1.join(d2, "id").show
>> +---+-+-+
>> | id|label|label|
>> +---+-+-+
>> | 40|   40|   60|
>> +---+-+-+
>>
>> d1.join(d2, "id").select(d1("label")).show
>> +-+
>> |label|
>> +-+
>> |   40|
>> +-+
>> (expected answer: 40, right!)
>>
>> d1.join(d2, "id").select(d2("label")).show
>> +-+
>> |label|
>> +-+
>> |   40|
>> +-+
>> (expected answer: 60, wrong!)
>>
>> d1.join(d2, "id").select(d2("label")).explain
>> == Physical Plan ==
>> TungstenProject [label#15]
>>  SortMergeJoin [id#14], [id#30]
>>   TungstenSort [id#14 ASC], false, 0
>>TungstenExchange hashpartitioning(id#14)
>> TungstenProject [_1#12 AS id#14,_2#13 AS label#15]
>>  Filter (_2#13 < 60)
>>   Scan PhysicalRDD[_1#12,_2#13]
>>   TungstenSort [id#30 ASC], false, 0
>>TungstenExchange hashpartitioning(id#30)
>> TungstenProject [_1#12 AS id#30]
>>  Filter (_2#13 = 60)
>>   Scan PhysicalRDD[_1#12,_2#13]
>>
>> Again, this is just a tip of the iceberg. I have spent hours to find out
>> this weird behaviour.
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Mar 29, 2016 at 2:01 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Sunitha,
>>>
>>> Thank you for the reference Jira. It looks like this is the bug I'm
>>> hitting. Most of the bugs related to this seems to associate with
>>> dataframes derived from the one dataframe (base in this case). In SQL, this
>>> is a self-join and dropping d2.label should not affect d1.label. There are
>>> other bugs I found these three days that are associated with this type of
>>> joins. In one case, if I don't drop the duplicate column BEFORE the join,
>>> spark has preferences on the columns from d2 dataframe. I will see if I can
>>> replicate in a small program like above.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <
>>> skambha...@gmail.com> wrote:
>>>
>>>> Hi Jerry,
>>>>
>>>> I think you are running into an issue similar to SPARK-14040
>>>> https://issues.apache.org/jira/browse/SPARK-14040
>>>>
>>>> One way to resolve it is to use alias.
>>>>
>>>> Here is an example that I tried on trunk and I do not see any
>>>> exceptions.
>>>>
>>>> val d1=base.where($"label" === 0) as("d1")
>>>> val d2=base.where($"label" === 1).as("d2")
>>>>
>>>> d1.join(d2, $"d1.id" === $"d2.id", 
>>>> "left_outer").drop($"d2.label").select($"d1.label")
>>>>
>>>>
>>>> Hope this helps some.
>>>>
>>>> Best regards,
>>>> Sunitha.
>>>>
>>>> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>
>>>> Hi spark users and developers,
>>>>
>>>> I'm using spark 1.5.1 (I have no choice because this is what we used).
>>>> I ran into some very unexpected behaviour when I did some join operations
>>>> lately. I cannot post my actual code here and the following code is not for
>>>> practical reasons but it should demonstrate the issue.
>>>>
>>>> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
>>>> 99).map((_,1))).toDF("id", "label")
>>>> val d1=base.where($"label" === 0)
>>>> val d2=base.where($"label" === 1)
>>>> d1.join(d2, d1("id") === d2("id"),
>>>> "left_outer").drop(d2("label")).select(d1("label"))
>>>>
>>>>
>>>> The above code will throw an exception saying the column label is not
>>>> found. Do you have a reason for throwing an exception when the column has
>>>> not been dropped for d1("label")?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>
>>
>


Re: [Spark SQL] Unexpected Behaviour

2016-03-29 Thread Jerry Lam
Hi Sunitha,

Thank you for the reference Jira. It looks like this is the bug I'm
hitting. Most of the bugs related to this seems to associate with
dataframes derived from the one dataframe (base in this case). In SQL, this
is a self-join and dropping d2.label should not affect d1.label. There are
other bugs I found these three days that are associated with this type of
joins. In one case, if I don't drop the duplicate column BEFORE the join,
spark has preferences on the columns from d2 dataframe. I will see if I can
replicate in a small program like above.

Best Regards,

Jerry


On Mon, Mar 28, 2016 at 6:27 PM, Sunitha Kambhampati <skambha...@gmail.com>
wrote:

> Hi Jerry,
>
> I think you are running into an issue similar to SPARK-14040
> https://issues.apache.org/jira/browse/SPARK-14040
>
> One way to resolve it is to use alias.
>
> Here is an example that I tried on trunk and I do not see any exceptions.
>
> val d1=base.where($"label" === 0) as("d1")
> val d2=base.where($"label" === 1).as("d2")
>
> d1.join(d2, $"d1.id" === $"d2.id", 
> "left_outer").drop($"d2.label").select($"d1.label")
>
>
> Hope this helps some.
>
> Best regards,
> Sunitha.
>
> On Mar 28, 2016, at 2:34 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
> Hi spark users and developers,
>
> I'm using spark 1.5.1 (I have no choice because this is what we used). I
> ran into some very unexpected behaviour when I did some join operations
> lately. I cannot post my actual code here and the following code is not for
> practical reasons but it should demonstrate the issue.
>
> val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
> 99).map((_,1))).toDF("id", "label")
> val d1=base.where($"label" === 0)
> val d2=base.where($"label" === 1)
> d1.join(d2, d1("id") === d2("id"),
> "left_outer").drop(d2("label")).select(d1("label"))
>
>
> The above code will throw an exception saying the column label is not
> found. Do you have a reason for throwing an exception when the column has
> not been dropped for d1("label")?
>
> Best Regards,
>
> Jerry
>
>
>


[Spark SQL] Unexpected Behaviour

2016-03-28 Thread Jerry Lam
Hi spark users and developers,

I'm using spark 1.5.1 (I have no choice because this is what we used). I
ran into some very unexpected behaviour when I did some join operations
lately. I cannot post my actual code here and the following code is not for
practical reasons but it should demonstrate the issue.

val base = sc.parallelize(( 0 to 49).map(i =>(i,0)) ++ (50 to
99).map((_,1))).toDF("id", "label")
val d1=base.where($"label" === 0)
val d2=base.where($"label" === 1)
d1.join(d2, d1("id") === d2("id"),
"left_outer").drop(d2("label")).select(d1("label"))


The above code will throw an exception saying the column label is not
found. Do you have a reason for throwing an exception when the column has
not been dropped for d1("label")?

Best Regards,

Jerry


Pattern Matching over a Sequence of rows using Spark

2016-02-28 Thread Jerry Lam
Hi spark users and developers,

Anyone has experience developing pattern matching over a sequence of rows
using Spark? I'm talking about functionality similar to matchpath in Hive
or match_recognize in Oracle DB. It is used for path analysis on
clickstream data. If you know of any libraries that do that, please share
your findings!

Thank you,

Jerry


Re: Streaming with broadcast joins

2016-02-19 Thread Jerry Lam
Hi guys,

I also encounter broadcast dataframe issue not for steaming jobs but regular 
dataframe join. In my case, the executors died probably due to OOM which I 
don't think it should use that much memory. Anyway, I'm going to craft an 
example and send it here to see if it is a bug or something I've misunderstood.

Best Regards,

Jerry

Sent from my iPhone

> On 19 Feb, 2016, at 10:20 am, Sebastian Piu  wrote:
> 
> I don't have the code with me now, and I ended moving everything to RDD in 
> the end and using map operations to do some lookups, i.e. instead of 
> broadcasting a Dataframe I ended broadcasting a Map 
> 
> 
>> On Fri, Feb 19, 2016 at 11:39 AM Srikanth  wrote:
>> It didn't fail. It wasn't broadcasting. I just ran the test again and here 
>> are the logs.
>> Every batch is reading the metadata file.
>> 
>>  16/02/19 06:27:02 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:0+27
>>  16/02/19 06:27:02 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:27+28
>> 
>>  16/02/19 06:27:40 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:27+28
>>  16/02/19 06:27:40 INFO HadoopRDD: Input split: 
>> file:/shared/data/test-data.txt:0+27
>> 
>> If I remember, foreachRDD is executed in the driver's context. Not sure how 
>> we'll be able to achieve broadcast in this approach(unless we use SQL 
>> broadcast hint again)
>> 
>> When you say "it worked before",  was it with an older version of spark? I'm 
>> trying this on 1.6.
>> If you still have the streaming job running can you verify in spark UI that 
>> broadcast join is being used. Also, if the files are read and broadcasted 
>> each batch??
>> 
>> Thanks for the help!
>> 
>> 
>>> On Fri, Feb 19, 2016 at 3:49 AM, Sebastian Piu  
>>> wrote:
>>> I don't see anything obviously wrong on your second approach, I've done it 
>>> like that before and it worked. When you say that it didn't work what do 
>>> you mean? did it fail? it didnt broadcast? 
>>> 
 On Thu, Feb 18, 2016 at 11:43 PM Srikanth  wrote:
 Code with SQL broadcast hint. This worked and I was able to see that 
 broadcastjoin was performed.
 
val testDF = sqlContext.read.format("com.databricks.spark.csv")

 .schema(schema).load("file:///shared/data/test-data.txt") 
 
val lines = ssc.socketTextStream("DevNode", )
 
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, 
 l(1))).toDF()
val resultDF = recordDF.join(testDF, "Age")

 resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
 }
 
 But for every batch this file was read and broadcast was performed. 
 Evaluating the entire DAG I guess.
 16/02/18 12:24:02 INFO HadoopRDD: Input split: 
 file:/shared/data/test-data.txt:27+28
 16/02/18 12:24:02 INFO HadoopRDD: Input split: 
 file:/shared/data/test-data.txt:0+27
 
 16/02/18 12:25:00 INFO HadoopRDD: Input split: 
 file:/shared/data/test-data.txt:27+28
 16/02/18 12:25:00 INFO HadoopRDD: Input split: 
 file:/shared/data/test-data.txt:0+27
 
 
 Then I changed code to broadcast the dataframe. This didn't work either. 
 Not sure if this is what you meant by broadcasting a dataframe.
 
val testDF = 
 sc.broadcast(sqlContext.read.format("com.databricks.spark.csv")

 .schema(schema).load("file:///shared/data/test-data.txt") 
 )
 
val lines = ssc.socketTextStream("DevNode", )
 
lines.foreachRDD((rdd, timestamp) => {
val recordDF = rdd.map(_.split(",")).map(l => Record(l(0).toInt, 
 l(1))).toDF()
val resultDF = recordDF.join(testDF.value, "Age")

 resultDF.write.format("com.databricks.spark.csv").save("file:///shared/data/output/streaming/"+timestamp)
 }
 
 
> On Thu, Feb 18, 2016 at 12:55 PM, Sebastian Piu  
> wrote:
> Can you paste the code where you use sc.broadcast ?
> 
>> On Thu, Feb 18, 2016 at 5:32 PM Srikanth  wrote:
>> Sebastian,
>> 
>> I was able to broadcast using sql broadcast hint. Question is how to 
>> prevent this broadcast for each RDD.
>> Is there a way where it can be broadcast once and used locally for each 
>> RDD?
>> Right now every batch the metadata file is read and the DF is 
>> broadcasted.
>> I tried sc.broadcast and that did not provide this behavior.
>> 
>> Srikanth
>> 
>> 
>>> On Wed, Feb 17, 2016 at 4:53 PM, Sebastian Piu 
>>>  wrote:
>>> You should be able to broadcast that data frame using sc.broadcast 

Re: Convert Iterable to RDD

2016-02-12 Thread Jerry Lam
Not sure if I understand your problem well but why don't you create the file 
locally and then upload to hdfs?

Sent from my iPhone

> On 12 Feb, 2016, at 9:09 am, "seb.arzt"  wrote:
> 
> I have an Iterator of several million elements, which unfortunately won't fit
> into the driver memory at the same time. I would like to save them as object
> file in HDFS:
> 
> Doing so I am running out of memory on the driver:
> 
> Using a stream
> 
> also won't work. I cannot further increase the driver memory. Why doesn't it
> work out of the box? Shouldn't lazy evaluation and garbage collection
> prevent the program from running out of memory? I could manually split the
> Iterator into chunks and serialize each chunk, but it feels wrong. What is
> going wrong here?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Convert-Iterable-to-RDD-tp16882p26211.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark 1.5.2 memory error

2016-02-03 Thread Jerry Lam
Hi guys,

I was processing 300GB data with lot of joins today. I have a combination
of RDD->Dataframe->RDD due to legacy code. I have memory issues at the
beginning. After fine-tuning those configurations that many already
suggested above, it works with 0 task failed. I think it is fair to say any
memory intensive applications would face similar memory issue. It is not
very fair to say it sucks just because it has memory issues. The memory
issue comes in many forms such as 1. bad framework 2. bad code. 3. bad
framework and bad code. I usually blame bad code first, then bad framework.
If it is truly it fails because of the bad framework (mesos+spark+fine
grain mode = disaster), then make the code changes to adapt to the bad
framework.

I never see code that can magically run with 100% completion when data is
close to terabyte without some serious engineering efforts. A framework can
only help a bit but you are still responsible for making conscious
decisions on how much memory and data you are working with. For instance, a
k-v pair with v having 100GB and you allocate 1GB per executor, this is
going to blow up no matter how many times you execute it.

The memory/core is what I fine tune most. Making sure the task/core has
enough memory to execute to completion. Some times you really don't know
how much data you keep in memory until you profile your application.
(calculate some statistics help).

Best Regards,

Jerry



On Wed, Feb 3, 2016 at 4:58 PM, Nirav Patel  wrote:

> About OP.
>
> How many cores you assign per executor? May be reducing that number will
> give more portion of executor memory to each task being executed on that
> executor. Others please comment if that make sense.
>
>
>
> On Wed, Feb 3, 2016 at 1:52 PM, Nirav Patel  wrote:
>
>> I know it;s a strong word but when I have a case open for that with MapR
>> and Databricks for a month and their only solution to change to DataFrame
>> it frustrate you. I know DataFrame/Sql catalyst has internal optimizations
>> but it requires lot of code change. I think there's something fundamentally
>> wrong (or different from hadoop) in framework that is not allowing it to do
>> robust memory management. I know my job is memory hogger, it does a groupBy
>> and perform combinatorics in reducer side; uses additional datastructures
>> at task levels. May be spark is running multiple heavier tasks on same
>> executor and collectively they cause OOM. But suggesting DataFrame is NOT a
>> Solution for me (and most others who already invested time with RDD and
>> loves the type safety it provides). Not even sure if changing to DataFrame
>> will for sure solve the issue.
>>
>> On Wed, Feb 3, 2016 at 1:33 PM, Mohammed Guller 
>> wrote:
>>
>>> Nirav,
>>>
>>> Sorry to hear about your experience with Spark; however, sucks is a very
>>> strong word. Many organizations are processing a lot more than 150GB of
>>> data  with Spark.
>>>
>>>
>>>
>>> Mohammed
>>>
>>> Author: Big Data Analytics with Spark
>>> 
>>>
>>>
>>>
>>> *From:* Nirav Patel [mailto:npa...@xactlycorp.com]
>>> *Sent:* Wednesday, February 3, 2016 11:31 AM
>>> *To:* Stefan Panayotov
>>> *Cc:* Jim Green; Ted Yu; Jakob Odersky; user@spark.apache.org
>>>
>>> *Subject:* Re: Spark 1.5.2 memory error
>>>
>>>
>>>
>>> Hi Stefan,
>>>
>>>
>>>
>>> Welcome to the OOM - heap space club. I have been struggling with
>>> similar errors (OOM and yarn executor being killed) and failing job or
>>> sending it in retry loops. I bet the same job will run perfectly fine with
>>> less resource on Hadoop MapReduce program. I have tested it for my program
>>> and it does work.
>>>
>>>
>>>
>>> Bottomline from my experience. Spark sucks with memory management when
>>> job is processing large (not huge) amount of data. It's failing for me with
>>> 16gb executors, 10 executors, 6 threads each. And data its processing is
>>> only 150GB! It's 1 billion rows for me. Same job works perfectly fine with
>>> 1 million rows.
>>>
>>>
>>>
>>> Hope that saves you some trouble.
>>>
>>>
>>>
>>> Nirav
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Wed, Feb 3, 2016 at 11:00 AM, Stefan Panayotov 
>>> wrote:
>>>
>>> I drastically increased the memory:
>>>
>>> spark.executor.memory = 50g
>>> spark.driver.memory = 8g
>>> spark.driver.maxResultSize = 8g
>>> spark.yarn.executor.memoryOverhead = 768
>>>
>>> I still see executors killed, but this time the memory does not seem to
>>> be the issue.
>>> The error on the Jupyter notebook is:
>>>
>>>
>>> Py4JJavaError: An error occurred while calling 
>>> z:org.apache.spark.api.python.PythonRDD.collectAndServe.
>>>
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: 
>>> Exception while getting task result: java.io.IOException: Failed to connect 
>>> to /10.0.0.9:48755
>>>
>>>
>>> From nodemanagers log corresponding to worker 10.0.0.9:
>>>

Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-02 Thread Jerry Lam
I think spark dataframe supports more than just SQL. It is more like pandas
dataframe.( I rarely use the SQL feature. )
There are a lot of novelties in dataframe so I think it is quite optimize
for many tasks. The in-memory data structure is very memory efficient. I
just change a very slow RDD program to use Dataframe. The performance gain
is about 2 times while using less CPU. Of course, if you are very good at
optimizing your code, then use pure RDD.


On Tue, Feb 2, 2016 at 8:08 PM, Koert Kuipers  wrote:

> Dataset will have access to some of the catalyst/tungsten optimizations
> while also giving you scala and types. However that is currently
> experimental and not yet as efficient as it could be.
> On Feb 2, 2016 7:50 PM, "Nirav Patel"  wrote:
>
>> Sure, having a common distributed query and compute engine for all kind
>> of data source is alluring concept to market and advertise and to attract
>> potential customers (non engineers, analyst, data scientist). But it's
>> nothing new!..but darn old school. it's taking bits and pieces from
>> existing sql and no-sql technology. It lacks many panache of robust sql
>> engine. I think what put spark aside from everything else on market is RDD!
>> and flexibility and scala-like programming style given to developers which
>> is simply much more attractive to write then sql syntaxes, schema and
>> string constants that falls apart left and right. Writing sql is old
>> school. period.  good luck making money though :)
>>
>> On Tue, Feb 2, 2016 at 4:38 PM, Koert Kuipers  wrote:
>>
>>> To have a product databricks can charge for their sql engine needs to be
>>> competitive. That's why they have these optimizations in catalyst. RDD is
>>> simply no longer the focus.
>>> On Feb 2, 2016 7:17 PM, "Nirav Patel"  wrote:
>>>
 so latest optimizations done on spark 1.4 and 1.5 releases are mostly
 from project Tungsten. Docs says it usues sun.misc.unsafe to convert
 physical rdd structure into byte array at some point for optimized GC and
 memory. My question is why is it only applicable to SQL/Dataframe and not
 RDD? RDD has types too!


 On Mon, Jan 25, 2016 at 11:10 AM, Nirav Patel 
 wrote:

> I haven't gone through much details of spark catalyst optimizer and
> tungston project but we have been advised by databricks support to use
> DataFrame to resolve issues with OOM error that we are getting during Join
> and GroupBy operations. We use spark 1.3.1 and looks like it can not
> perform external sort and blows with OOM.
>
> https://forums.databricks.com/questions/2082/i-got-the-akka-frame-size-exceeded-exception.html
>
> Now it's great that it has been addressed in spark 1.5 release but why
> databricks advocating to switch to DataFrames? It may make sense for batch
> jobs or near real-time jobs but not sure if they do when you are 
> developing
> real time analytics where you want to optimize every millisecond that you
> can. Again I am still knowledging myself with DataFrame APIs and
> optimizations and I will benchmark it against RDD for our batch and
> real-time use case as well.
>
> On Mon, Jan 25, 2016 at 9:47 AM, Mark Hamstra  > wrote:
>
>> What do you think is preventing you from optimizing your
>> own RDD-level transformations and actions?  AFAIK, nothing that has been
>> added in Catalyst precludes you from doing that.  The fact of the matter
>> is, though, that there is less type and semantic information available to
>> Spark from the raw RDD API than from using Spark SQL, DataFrames or
>> DataSets.  That means that Spark itself can't optimize for raw RDDs the
>> same way that it can for higher-level constructs that can leverage
>> Catalyst; but if you want to write your own optimizations based on your 
>> own
>> knowledge of the data types and semantics that are hiding in your raw 
>> RDDs,
>> there's no reason that you can't do that.
>>
>> On Mon, Jan 25, 2016 at 9:35 AM, Nirav Patel 
>> wrote:
>>
>>> Hi,
>>>
>>> Perhaps I should write a blog about this that why spark is focusing
>>> more on writing easier spark jobs and hiding underlaying performance
>>> optimization details from a seasoned spark users. It's one thing to 
>>> provide
>>> such abstract framework that does optimization for you so you don't 
>>> have to
>>> worry about it as a data scientist or data analyst but what about
>>> developers who do not want overhead of SQL and Optimizers and 
>>> unnecessary
>>> abstractions ! Application designer who knows their data and queries 
>>> should
>>> be able to optimize at RDD level transformations and actions. Does spark
>>> provides a way to achieve same 

Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-02 Thread Jerry Lam
Hi Michael,

Is there a section in the spark documentation demonstrate how to serialize
arbitrary objects in Dataframe? The last time I did was using some User
Defined Type (copy from VectorUDT).

Best Regards,

Jerry

On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust 
wrote:

> A principal difference between RDDs and DataFrames/Datasets is that the
>> latter have a schema associated to them. This means that they support only
>> certain types (primitives, case classes and more) and that they are
>> uniform, whereas RDDs can contain any serializable object and must not
>> necessarily be uniform. These properties make it possible to generate very
>> efficient serialization and other optimizations that cannot be achieved
>> with plain RDDs.
>>
>
> You can use Encoder.kryo() as well to serialize arbitrary objects, just
> like with RDDs.
>


Union of RDDs without the overhead of Union

2016-02-02 Thread Jerry Lam
Hi Spark users and developers,

anyone knows how to union two RDDs without the overhead of it?

say rdd1.union(rdd2).saveTextFile(..)
This requires a stage to union the 2 rdds before saveAsTextFile (2 stages).
Is there a way to skip the union step but have the contents of the two rdds
save to the same output text file?

Thank you!

Jerry


Re: Spark DataFrame Catalyst - Another Oracle like query optimizer?

2016-02-02 Thread Jerry Lam
Hi Nirav,
I'm sure you read this?
https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

There is a benchmark in the article to show that dataframe "can" outperform
RDD implementation by 2 times. Of course, benchmarks can be "made". But
from the code snippet you wrote, I "think" dataframe will choose between
different join implementation based on the data statistics.

I cannot comment on the beauty of it because "beauty is in the eye of the
beholder" LOL
Regarding the comment on error prone, can you say why you think it is the
case? Relative to what other ways?

Best Regards,

Jerry


On Tue, Feb 2, 2016 at 8:59 PM, Nirav Patel <npa...@xactlycorp.com> wrote:

> I dont understand why one thinks RDD of case object doesn't have
> types(schema) ? If spark can convert RDD to DataFrame which means it
> understood the schema. SO then from that point why one has to use SQL
> features to do further processing? If all spark need for optimizations is
> schema then what this additional SQL features buys ? If there is a way to
> avoid SQL feature using DataFrame I don't mind it. But looks like I have to
> convert all my existing transformation to things like
> df1.join(df2,df1('abc') == df2('abc'), 'left_outer') .. that's plain ugly
> and error prone in my opinion.
>
> On Tue, Feb 2, 2016 at 5:49 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Michael,
>>
>> Is there a section in the spark documentation demonstrate how to
>> serialize arbitrary objects in Dataframe? The last time I did was using
>> some User Defined Type (copy from VectorUDT).
>>
>> Best Regards,
>>
>> Jerry
>>
>> On Tue, Feb 2, 2016 at 8:46 PM, Michael Armbrust <mich...@databricks.com>
>> wrote:
>>
>>> A principal difference between RDDs and DataFrames/Datasets is that the
>>>> latter have a schema associated to them. This means that they support only
>>>> certain types (primitives, case classes and more) and that they are
>>>> uniform, whereas RDDs can contain any serializable object and must not
>>>> necessarily be uniform. These properties make it possible to generate very
>>>> efficient serialization and other optimizations that cannot be achieved
>>>> with plain RDDs.
>>>>
>>>
>>> You can use Encoder.kryo() as well to serialize arbitrary objects, just
>>> like with RDDs.
>>>
>>
>>
>
>
>
> [image: What's New with Xactly] <http://www.xactlycorp.com/email-click/>
>
> <https://www.nyse.com/quote/XNYS:XTLY>  [image: LinkedIn]
> <https://www.linkedin.com/company/xactly-corporation>  [image: Twitter]
> <https://twitter.com/Xactly>  [image: Facebook]
> <https://www.facebook.com/XactlyCorp>  [image: YouTube]
> <http://www.youtube.com/xactlycorporation>
>


Re: Spark, Mesos, Docker and S3

2016-01-26 Thread Jerry Lam
Hi Mao,

Can you try --jars to include those jars?

Best Regards,

Jerry

Sent from my iPhone

> On 26 Jan, 2016, at 7:02 pm, Mao Geng  wrote:
> 
> Hi there, 
> 
> I am trying to run Spark on Mesos using a Docker image as executor, as 
> mentioned 
> http://spark.apache.org/docs/latest/running-on-mesos.html#mesos-docker-support.
>  
> 
> I built a docker image using the following Dockerfile (which is based on 
> https://github.com/apache/spark/blob/master/docker/spark-mesos/Dockerfile):
> 
> FROM mesosphere/mesos:0.25.0-0.2.70.ubuntu1404
> 
> # Update the base ubuntu image with dependencies needed for Spark
> RUN apt-get update && \
> apt-get install -y python libnss3 openjdk-7-jre-headless curl
> 
> RUN curl 
> http://www.carfab.com/apachesoftware/spark/spark-1.6.0/spark-1.6.0-bin-hadoop2.6.tgz
>  | tar -xzC /opt && \
> ln -s /opt/spark-1.6.0-bin-hadoop2.6 /opt/spark
> ENV SPARK_HOME /opt/spark
> ENV MESOS_NATIVE_JAVA_LIBRARY /usr/local/lib/libmesos.so
> 
> Then I successfully ran spark-shell via this docker command:
> docker run --rm -it --net=host /: 
> /opt/spark/bin/spark-shell --master mesos://:5050 --conf 
> /: 
> 
> So far so good. Then I wanted to call sc.textFile to load a file from S3, but 
> I was blocked by some issues which I couldn't figure out. I've read 
> https://dzone.com/articles/uniting-spark-parquet-and-s3-as-an-alternative-to 
> and 
> http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos,
>  learned that I need to add hadood-aws-2.7.1 and aws-java-sdk-2.7.4 into the 
> executor and driver's classpaths, in order to access s3 files. 
> 
> So, I added following lines into Dockerfile and build a new image. 
> RUN curl 
> https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk/1.7.4/aws-java-sdk-1.7.4.jar
>  -o /opt/spark/lib/aws-java-sdk-1.7.4.jar
> RUN curl 
> http://central.maven.org/maven2/org/apache/hadoop/hadoop-aws/2.7.1/hadoop-aws-2.7.1.jar
>  -o /opt/spark/lib/hadoop-aws-2.7.1.jar
> 
> Then I started spark-shell again with below command: 
> docker run --rm -it --net=host /: 
> /opt/spark/bin/spark-shell --master mesos://:5050 --conf 
> /: --conf 
> spark.executor.extraClassPath=/opt/spark/lib/hadoop-aws-2.7.1.jar:/opt/spark/lib/aws-java-sdk-1.7.4.jar
>  --conf 
> spark.driver.extraClassPath=/opt/spark/lib/hadoop-aws-2.7.1.jar:/opt/spark/lib/aws-java-sdk-1.7.4.jar
> 
> But below command failed when I ran it in spark-shell: 
> scala> sc.textFile("s3a:///").count()
> [Stage 0:>  (0 + 2) / 
> 2]16/01/26 23:05:23 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, 
> ip-172-16-14-203.us-west-2.compute.internal): java.lang.RuntimeException: 
> java.lang.ClassNotFoundException: Class 
> org.apache.hadoop.fs.s3a.S3AFileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2074)
>   at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2578)
>   at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
>   at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
>   at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
>   at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
>   at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
>   at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
>   at 
> org.apache.hadoop.mapred.LineRecordReader.(LineRecordReader.java:107)
>   at 
> org.apache.hadoop.mapred.TextInputFormat.getRecordReader(TextInputFormat.java:67)
>   at org.apache.spark.rdd.HadoopRDD$$anon$1.(HadoopRDD.scala:237)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:208)
>   at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:101)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
>   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
>   at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
>   at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
>   at org.apache.spark.scheduler.Task.run(Task.scala:89)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException: Class 
> org.apache.hadoop.fs.s3a.S3AFileSystem not found
>   at 
> org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:1980)
>   at 
> org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2072)
>   ... 23 more
> 
> I checked hadoop-aws-2.7.1.jar, the 

Re: sqlContext.cacheTable("tableName") vs dataFrame.cache()

2016-01-19 Thread Jerry Lam
Is cacheTable similar to asTempTable before? 

Sent from my iPhone

> On 19 Jan, 2016, at 4:18 am, George Sigletos  wrote:
> 
> Thanks Kevin for your reply.
> 
> I was suspecting the same thing as well, although it still does not make much 
> sense to me why would you need to do both:
> myData.cache()
> sqlContext.cacheTable("myData")
> 
> in case you are using both sqlContext and dataframes to execute queries
> 
> dataframe.select(...) and sqlContext.sql("select ...") are equivalent, as far 
> as I understand
> 
> Kind regards,
> George
> 
>> On Fri, Jan 15, 2016 at 6:15 PM, Kevin Mellott  
>> wrote:
>> Hi George,
>> 
>> I believe that sqlContext.cacheTable("tableName") is to be used when you 
>> want to cache the data that is being used within a Spark SQL query. For 
>> example, take a look at the code below.
>>  
>>> val myData = sqlContext.load("com.databricks.spark.csv", Map("path" -> 
>>> "hdfs://somepath/file", "header" -> "false").toDF("col1", "col2")
>>> myData.registerTempTable("myData")  
>> 
>> Here, the usage of cache() will affect ONLY the myData.select query. 
>>> myData.cache() 
>>> myData.select("col1", "col2").show() 
>>  
>> Here, the usage of cacheTable will affect ONLY the sqlContext.sql query.
>>> sqlContext.cacheTable("myData")
>>> sqlContext.sql("SELECT col1, col2 FROM myData").show()
>> 
>> Thanks,
>> Kevin
>> 
>>> On Fri, Jan 15, 2016 at 7:00 AM, George Sigletos  
>>> wrote:
>>> According to the documentation they are exactly the same, but in my queries 
>>> 
>>> dataFrame.cache() 
>>> 
>>> results in much faster execution times vs doing 
>>> 
>>> sqlContext.cacheTable("tableName")
>>> 
>>> Is there any explanation about this? I am not caching the RDD prior to 
>>> creating the dataframe. Using Pyspark on Spark 1.5.2
>>> 
>>> Kind regards,
>>> George
> 


[Spark-SQL] from_unixtime with user-specified timezone

2016-01-18 Thread Jerry Lam
Hi spark users and developers,

what do you do if you want the from_unixtime function in spark sql to
return the timezone you want instead of the system timezone?

Best Regards,

Jerry


Re: [Spark-SQL] from_unixtime with user-specified timezone

2016-01-18 Thread Jerry Lam
Thanks Alex:

So you suggested something like:
from_utc_timestamp(to_utc_timestamp(from_unixtime(1389802875),'America/Montreal'),
'America/Los_Angeles')?

This is a lot of conversion :)

Is there a particular reason not to have from_unixtime to take timezone
information?

I think I will make a UDF if this is the only way out of the box.

Thanks!

Jerry

On Mon, Jan 18, 2016 at 2:32 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Look at
> to_utc_timestamp
>
> from_utc_timestamp
> On Jan 18, 2016 9:39 AM, "Jerry Lam" <chiling...@gmail.com> wrote:
>
>> Hi spark users and developers,
>>
>> what do you do if you want the from_unixtime function in spark sql to
>> return the timezone you want instead of the system timezone?
>>
>> Best Regards,
>>
>> Jerry
>>
>


Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-15 Thread Jerry Lam
Hi Michael,

Thanks for sharing the tip. It will help to the write path of the partitioned 
table. 
Do you have similar suggestion on reading the partitioned table back when there 
is a million of distinct values on the partition field (for example on user 
id)? Last time I have trouble to read a partitioned table because it takes very 
long (over hours on s3) to execute the 
sqlcontext.read.parquet("partitioned_table").

Best Regards,

Jerry

Sent from my iPhone

> On 15 Jan, 2016, at 3:59 pm, Michael Armbrust <mich...@databricks.com> wrote:
> 
> See here for some workarounds: 
> https://issues.apache.org/jira/browse/SPARK-12546
> 
>> On Thu, Jan 14, 2016 at 6:46 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi Arkadiusz,
>> 
>> the partitionBy is not designed to have many distinct value the last time I 
>> used it. If you search in the mailing list, I think there are couple of 
>> people also face similar issues. For example, in my case, it won't work over 
>> a million distinct user ids. It will require a lot of memory and very long 
>> time to read the table back. 
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>>> On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz <arkadiusz.b...@gmail.com> 
>>> wrote:
>>> Hi
>>> 
>>> What is the proper configuration for saving parquet partition with
>>> large number of repeated keys?
>>> 
>>> On bellow code I load 500 milion rows of data and partition it on
>>> column with not so many different values.
>>> 
>>> Using spark-shell with 30g per executor and driver and 3 executor cores
>>> 
>>> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>>> 
>>> 
>>> Job failed because not enough memory in executor :
>>> 
>>> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
>>> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
>>> used. Consider boosting spark.yarn.executor.memoryOverhead.
>>> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
>>> datanode2.babar.poc: Container killed by YARN for exceeding memory
>>> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
>>> spark.yarn.executor.memoryOverhead.
>>> 
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: How To Save TF-IDF Model In PySpark

2016-01-15 Thread Jerry Lam
Can you save it to parquet with the vector in one field?

Sent from my iPhone

> On 15 Jan, 2016, at 7:33 pm, Andy Davidson  
> wrote:
> 
> Are you using 1.6.0 or an older version?
> 
> I think I remember something in 1.5.1 saying save was not implemented in 
> python.
> 
> 
> The current doc does not say anything about save()
> http://spark.apache.org/docs/latest/mllib-feature-extraction.html#tf-idf
> 
> http://spark.apache.org/docs/latest/ml-guide.html#saving-and-loading-pipelines
> "Often times it is worth it to save a model or a pipeline to disk for later 
> use. In Spark 1.6, a model import/export functionality was added to the 
> Pipeline API. Most basic transformers are supported as well as some of the 
> more basic ML models. Please refer to the algorithm’s API documentation to 
> see if saving and loading is supported."
> 
> andy
> 
> 
> 
> 
> From: Asim Jalis 
> Date: Friday, January 15, 2016 at 4:02 PM
> To: "user @spark" 
> Subject: How To Save TF-IDF Model In PySpark
> 
> Hi,
> 
> I am trying to save a TF-IDF model in PySpark. Looks like this is not
> supported. 
> 
> Using `model.save()` causes:
> 
> AttributeError: 'IDFModel' object has no attribute 'save'
> 
> Using `pickle` causes:
> 
> TypeError: can't pickle lock objects
> 
> Does anyone have suggestions 
> 
> Thanks!
> 
> Asim
> 
> Here is the full repro. Start pyspark shell and then run this code in
> it.
> 
> ```
> # Imports
> from pyspark import SparkContext
> from pyspark.mllib.feature import HashingTF
> 
> from pyspark.mllib.regression import LabeledPoint
> from pyspark.mllib.regression import Vectors
> from pyspark.mllib.feature import IDF
> 
> # Create some data
> n = 4
> freqs = [
> Vectors.sparse(n, (1, 3), (1.0, 2.0)), 
> Vectors.dense([0.0, 1.0, 2.0, 3.0]), 
> Vectors.sparse(n, [1], [1.0])]
> data = sc.parallelize(freqs)
> idf = IDF()
> model = idf.fit(data)
> tfidf = model.transform(data)
> 
> # View
> for r in tfidf.collect(): print(r)
> 
> # Try to save it
> model.save("foo.model")
> 
> # Try to save it with Pickle
> import pickle
> pickle.dump(model, open("model.p", "wb"))
> pickle.dumps(model)
> ```


Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-14 Thread Jerry Lam
Hi Arkadiusz,

the partitionBy is not designed to have many distinct value the last time I
used it. If you search in the mailing list, I think there are couple of
people also face similar issues. For example, in my case, it won't work
over a million distinct user ids. It will require a lot of memory and very
long time to read the table back.

Best Regards,

Jerry

On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz 
wrote:

> Hi
>
> What is the proper configuration for saving parquet partition with
> large number of repeated keys?
>
> On bellow code I load 500 milion rows of data and partition it on
> column with not so many different values.
>
> Using spark-shell with 30g per executor and driver and 3 executor cores
>
>
> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>
>
> Job failed because not enough memory in executor :
>
> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
> datanode2.babar.poc: Container killed by YARN for exceeding memory
> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Spark SQL]: Issues with writing dataframe with Append Mode to Parquet

2016-01-12 Thread Jerry Lam
Hi spark users and developers,

I wonder if the following observed behaviour is expected. I'm writing
dataframe to parquet into s3. I'm using append mode when I'm writing to it.
Since I'm using org.apache.spark.sql.
parquet.DirectParquetOutputCommitter as
the spark.sql.parquet.output.committer.class, I expected that no _temporary
files will be generated.

I appended the same dataframe twice to the same directory. The first
"append" works as expected; no _temporary files are generated because of
the DirectParquetOutputCommitter but the second "append" does generate
_temporary files and then it moved the files under the _temporary to the
output directory.

Is this behavior expected? Or is it a bug?

I'm using Spark 1.5.2.

Best Regards,

Jerry


Re: [Spark SQL]: Issues with writing dataframe with Append Mode to Parquet

2016-01-12 Thread Jerry Lam
Hi Michael,

Thanks for the hint! So if I turn off speculation, consecutive appends like
above will not produce temporary files right?
Which class is responsible for disabling the use of DirectOutputCommitter?

Thank you,

Jerry


On Tue, Jan 12, 2016 at 4:12 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> There can be dataloss when you are using the DirectOutputCommitter and
> speculation is turned on, so we disable it automatically.
>
> On Tue, Jan 12, 2016 at 1:11 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark users and developers,
>>
>> I wonder if the following observed behaviour is expected. I'm writing
>> dataframe to parquet into s3. I'm using append mode when I'm writing to it.
>> Since I'm using org.apache.spark.sql.
>> parquet.DirectParquetOutputCommitter as
>> the spark.sql.parquet.output.committer.class, I expected that no _temporary
>> files will be generated.
>>
>> I appended the same dataframe twice to the same directory. The first
>> "append" works as expected; no _temporary files are generated because of
>> the DirectParquetOutputCommitter but the second "append" does generate
>> _temporary files and then it moved the files under the _temporary to the
>> output directory.
>>
>> Is this behavior expected? Or is it a bug?
>>
>> I'm using Spark 1.5.2.
>>
>> Best Regards,
>>
>> Jerry
>>
>
>


Re: SparkSQL integration issue with AWS S3a

2016-01-06 Thread Jerry Lam
Hi Kostiantyn,

Yes. If security is a concern then this approach cannot satisfy it. The keys 
are visible in the properties files. If the goal is to hide them, you might be 
able go a bit further with this approach. Have you look at spark security page?

Best Regards,

Jerry 

Sent from my iPhone

> On 6 Jan, 2016, at 8:49 am, Kostiantyn Kudriavtsev 
> <kudryavtsev.konstan...@gmail.com> wrote:
> 
> Hi guys,
> 
> the only one big issue with this approach:
>>> spark.hadoop.s3a.access.key  is now visible everywhere, in logs, in spark 
>>> webui and is not secured at all...
> 
>> On Jan 2, 2016, at 11:13 AM, KOSTIANTYN Kudriavtsev 
>> <kudryavtsev.konstan...@gmail.com> wrote:
>> 
>> thanks Jerry, it works!
>> really appreciate your help 
>> 
>> Thank you,
>> Konstantin Kudryavtsev
>> 
>>> On Fri, Jan 1, 2016 at 4:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>> Hi Kostiantyn,
>>> 
>>> You should be able to use spark.conf to specify s3a keys.
>>> 
>>> I don't remember exactly but you can add hadoop properties by prefixing 
>>> spark.hadoop.*
>>> * is the s3a properties. For instance,
>>> 
>>> spark.hadoop.s3a.access.key wudjgdueyhsj
>>> 
>>> Of course, you need to make sure the property key is right. I'm using my 
>>> phone so I cannot easily verifying.
>>> 
>>> Then you can specify different user using different spark.conf via 
>>> --properties-file when spark-submit
>>> 
>>> HTH,
>>> 
>>> Jerry
>>> 
>>> Sent from my iPhone
>>> 
>>>> On 31 Dec, 2015, at 2:06 pm, KOSTIANTYN Kudriavtsev 
>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>> 
>>>> Hi Jerry,
>>>> 
>>>> what you suggested looks to be working (I put hdfs-site.xml into 
>>>> $SPARK_HOME/conf folder), but could you shed some light on how it can be 
>>>> federated per user?
>>>> Thanks in advance!
>>>> 
>>>> Thank you,
>>>> Konstantin Kudryavtsev
>>>> 
>>>>> On Wed, Dec 30, 2015 at 2:37 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>> Hi Kostiantyn,
>>>>> 
>>>>> I want to confirm that it works first by using hdfs-site.xml. If yes, you 
>>>>> could define different spark-{user-x}.conf and source them during 
>>>>> spark-submit. let us know if hdfs-site.xml works first. It should.
>>>>> 
>>>>> Best Regards,
>>>>> 
>>>>> Jerry
>>>>> 
>>>>> Sent from my iPhone
>>>>> 
>>>>>> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi Jerry,
>>>>>> 
>>>>>> I want to run different jobs on different S3 buckets - different AWS 
>>>>>> creds - on the same instances. Could you shed some light if it's 
>>>>>> possible to achieve with hdfs-site?
>>>>>> 
>>>>>> Thank you,
>>>>>> Konstantin Kudryavtsev
>>>>>> 
>>>>>>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>>>> Hi Kostiantyn,
>>>>>>> 
>>>>>>> Can you define those properties in hdfs-site.xml and make sure it is 
>>>>>>> visible in the class path when you spark-submit? It looks like a conf 
>>>>>>> sourcing issue to me. 
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> 
>>>>>>> Sent from my iPhone
>>>>>>> 
>>>>>>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>>> 
>>>>>>>> Chris,
>>>>>>>> 
>>>>>>>> thanks for the hist with AIM roles, but in my case  I need to run 
>>>>>>>> different jobs with different S3 permissions on the same cluster, so 
>>>>>>>> this approach doesn't work for me as far as I understood it
>>>>>>>> 
>>>>>>>> Thank you,
>>>>>>>> Konstantin Kudryavtsev
>>>>>>>> 
>>>>>>>>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly <

Re: SparkSQL integration issue with AWS S3a

2016-01-01 Thread Jerry Lam
Hi Kostiantyn,

You should be able to use spark.conf to specify s3a keys.

I don't remember exactly but you can add hadoop properties by prefixing 
spark.hadoop.*
* is the s3a properties. For instance,

spark.hadoop.s3a.access.key wudjgdueyhsj

Of course, you need to make sure the property key is right. I'm using my phone 
so I cannot easily verifying.

Then you can specify different user using different spark.conf via 
--properties-file when spark-submit

HTH,

Jerry

Sent from my iPhone

> On 31 Dec, 2015, at 2:06 pm, KOSTIANTYN Kudriavtsev 
> <kudryavtsev.konstan...@gmail.com> wrote:
> 
> Hi Jerry,
> 
> what you suggested looks to be working (I put hdfs-site.xml into 
> $SPARK_HOME/conf folder), but could you shed some light on how it can be 
> federated per user?
> Thanks in advance!
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 2:37 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi Kostiantyn,
>> 
>> I want to confirm that it works first by using hdfs-site.xml. If yes, you 
>> could define different spark-{user-x}.conf and source them during 
>> spark-submit. let us know if hdfs-site.xml works first. It should.
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>> Sent from my iPhone
>> 
>>> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>> 
>>> Hi Jerry,
>>> 
>>> I want to run different jobs on different S3 buckets - different AWS creds 
>>> - on the same instances. Could you shed some light if it's possible to 
>>> achieve with hdfs-site?
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
>>>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>> Hi Kostiantyn,
>>>> 
>>>> Can you define those properties in hdfs-site.xml and make sure it is 
>>>> visible in the class path when you spark-submit? It looks like a conf 
>>>> sourcing issue to me. 
>>>> 
>>>> Cheers,
>>>> 
>>>> Sent from my iPhone
>>>> 
>>>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>> 
>>>>> Chris,
>>>>> 
>>>>> thanks for the hist with AIM roles, but in my case  I need to run 
>>>>> different jobs with different S3 permissions on the same cluster, so this 
>>>>> approach doesn't work for me as far as I understood it
>>>>> 
>>>>> Thank you,
>>>>> Konstantin Kudryavtsev
>>>>> 
>>>>>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>>>> couple things:
>>>>>> 
>>>>>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>>>>>> credentials is a long and lonely road in the end
>>>>>> 
>>>>>> 2) one really bad workaround/hack is to run a job that hits every worker 
>>>>>> and writes the credentials to the proper location (~/.awscredentials or 
>>>>>> whatever)
>>>>>> 
>>>>>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>>>>>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>>>>>> 
>>>>>> if you switch to IAM roles, things become a lot easier as you can 
>>>>>> authorize all of the EC2 instances in the cluster - and handles 
>>>>>> autoscaling very well - and at some point, you will want to autoscale.
>>>>>> 
>>>>>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>> Chris,
>>>>>>> 
>>>>>>>  good question, as you can see from the code I set up them on driver, 
>>>>>>> so I expect they will be propagated to all nodes, won't them?
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> Konstantin Kudryavtsev
>>>>>>> 
>>>>>>>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>>>>>> are the credentials visible from each Worker node to all the Executor 
>>>>>>>> JVMs on each Worker?
>>>>>>>> 
>>>>>>>>> On Dec 30, 2015, at 12:45 PM, K

Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Jerry Lam
Hi Kostiantyn,

Can you define those properties in hdfs-site.xml and make sure it is visible in 
the class path when you spark-submit? It looks like a conf sourcing issue to 
me. 

Cheers,

Sent from my iPhone

> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Chris,
> 
> thanks for the hist with AIM roles, but in my case  I need to run different 
> jobs with different S3 permissions on the same cluster, so this approach 
> doesn't work for me as far as I understood it
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly  wrote:
>> couple things:
>> 
>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>> credentials is a long and lonely road in the end
>> 
>> 2) one really bad workaround/hack is to run a job that hits every worker and 
>> writes the credentials to the proper location (~/.awscredentials or whatever)
>> 
>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>> 
>> if you switch to IAM roles, things become a lot easier as you can authorize 
>> all of the EC2 instances in the cluster - and handles autoscaling very well 
>> - and at some point, you will want to autoscale.
>> 
>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>  wrote:
>>> Chris,
>>> 
>>>  good question, as you can see from the code I set up them on driver, so I 
>>> expect they will be propagated to all nodes, won't them?
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
 On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly  wrote:
 are the credentials visible from each Worker node to all the Executor JVMs 
 on each Worker?
 
> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>  wrote:
> 
> Dear Spark community,
> 
> I faced the following issue with trying accessing data on S3a, my code is 
> the following:
> 
> val sparkConf = new SparkConf()
> 
> val sc = new SparkContext(sparkConf)
> sc.hadoopConfiguration.set("fs.s3a.impl", 
> "org.apache.hadoop.fs.s3a.S3AFileSystem")
> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
> val sqlContext = SQLContext.getOrCreate(sc)
> val df = sqlContext.read.parquet(...)
> df.count
> 
> It results in the following exception and log messages:
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from BasicAWSCredentialsProvider: Access key or secret key is 
> null
> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
> metadata service at URL: 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
> credentials from InstanceProfileCredentialsProvider: The requested 
> metadata is not found at 
> http://x.x.x.x/latest/meta-data/iam/security-credentials/
> 15/12/30 17:00:32 ERROR Executor: Exception in task 1.0 in stage 1.0 (TID 
> 3)
> com.amazonaws.AmazonClientException: Unable to load AWS credentials from 
> any provider in the chain
>   at 
> com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
>   at 
> org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
> 
> I run standalone spark 1.5.2 and using hadoop 2.7.1
> 
> any ideas/workarounds?
> 
> AWS credentials are correct for this bucket
> 
> Thank you,
> Konstantin Kudryavtsev
>> 
>> 
>> 
>> -- 
>> 
>> Chris Fregly
>> Principal Data Solutions Engineer
>> IBM Spark Technology Center, San Francisco, CA
>> http://spark.tc | http://advancedspark.com
> 


Re: SparkSQL integration issue with AWS S3a

2015-12-30 Thread Jerry Lam
Hi Kostiantyn,

I want to confirm that it works first by using hdfs-site.xml. If yes, you could 
define different spark-{user-x}.conf and source them during spark-submit. let 
us know if hdfs-site.xml works first. It should.

Best Regards,

Jerry

Sent from my iPhone

> On 30 Dec, 2015, at 2:31 pm, KOSTIANTYN Kudriavtsev 
> <kudryavtsev.konstan...@gmail.com> wrote:
> 
> Hi Jerry,
> 
> I want to run different jobs on different S3 buckets - different AWS creds - 
> on the same instances. Could you shed some light if it's possible to achieve 
> with hdfs-site?
> 
> Thank you,
> Konstantin Kudryavtsev
> 
>> On Wed, Dec 30, 2015 at 2:10 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi Kostiantyn,
>> 
>> Can you define those properties in hdfs-site.xml and make sure it is visible 
>> in the class path when you spark-submit? It looks like a conf sourcing issue 
>> to me. 
>> 
>> Cheers,
>> 
>> Sent from my iPhone
>> 
>>> On 30 Dec, 2015, at 1:59 pm, KOSTIANTYN Kudriavtsev 
>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>> 
>>> Chris,
>>> 
>>> thanks for the hist with AIM roles, but in my case  I need to run different 
>>> jobs with different S3 permissions on the same cluster, so this approach 
>>> doesn't work for me as far as I understood it
>>> 
>>> Thank you,
>>> Konstantin Kudryavtsev
>>> 
>>>> On Wed, Dec 30, 2015 at 1:48 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>> couple things:
>>>> 
>>>> 1) switch to IAM roles if at all possible - explicitly passing AWS 
>>>> credentials is a long and lonely road in the end
>>>> 
>>>> 2) one really bad workaround/hack is to run a job that hits every worker 
>>>> and writes the credentials to the proper location (~/.awscredentials or 
>>>> whatever)
>>>> 
>>>> ^^ i wouldn't recommend this. ^^  it's horrible and doesn't handle 
>>>> autoscaling, but i'm mentioning it anyway as it is a temporary fix.
>>>> 
>>>> if you switch to IAM roles, things become a lot easier as you can 
>>>> authorize all of the EC2 instances in the cluster - and handles 
>>>> autoscaling very well - and at some point, you will want to autoscale.
>>>> 
>>>>> On Wed, Dec 30, 2015 at 1:08 PM, KOSTIANTYN Kudriavtsev 
>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>> Chris,
>>>>> 
>>>>>  good question, as you can see from the code I set up them on driver, so 
>>>>> I expect they will be propagated to all nodes, won't them?
>>>>> 
>>>>> Thank you,
>>>>> Konstantin Kudryavtsev
>>>>> 
>>>>>> On Wed, Dec 30, 2015 at 1:06 PM, Chris Fregly <ch...@fregly.com> wrote:
>>>>>> are the credentials visible from each Worker node to all the Executor 
>>>>>> JVMs on each Worker?
>>>>>> 
>>>>>>> On Dec 30, 2015, at 12:45 PM, KOSTIANTYN Kudriavtsev 
>>>>>>> <kudryavtsev.konstan...@gmail.com> wrote:
>>>>>>> 
>>>>>>> Dear Spark community,
>>>>>>> 
>>>>>>> I faced the following issue with trying accessing data on S3a, my code 
>>>>>>> is the following:
>>>>>>> 
>>>>>>> val sparkConf = new SparkConf()
>>>>>>> 
>>>>>>> val sc = new SparkContext(sparkConf)
>>>>>>> sc.hadoopConfiguration.set("fs.s3a.impl", 
>>>>>>> "org.apache.hadoop.fs.s3a.S3AFileSystem")
>>>>>>> sc.hadoopConfiguration.set("fs.s3a.access.key", "---")
>>>>>>> sc.hadoopConfiguration.set("fs.s3a.secret.key", "---")
>>>>>>> val sqlContext = SQLContext.getOrCreate(sc)
>>>>>>> val df = sqlContext.read.parquet(...)
>>>>>>> df.count
>>>>>>> 
>>>>>>> It results in the following exception and log messages:
>>>>>>> 15/12/30 17:00:32 DEBUG AWSCredentialsProviderChain: Unable to load 
>>>>>>> credentials from BasicAWSCredentialsProvider: Access key or secret key 
>>>>>>> is null
>>>>>>> 15/12/30 17:00:32 DEBUG EC2MetadataClient: Connecting to EC2 instance 
>>>>>>> metadata service at URL: 
>>>>>>> h

Re: ideal number of executors per machine

2015-12-15 Thread Jerry Lam
Hi Veljko,

I usually ask the following questions: “how many memory per task?” then "How 
many cpu per task?” then I calculate based on the memory and cpu requirements 
per task. You might be surprise (maybe not you, but at least I am :) ) that 
many OOM issues are actually because of this. 

Best Regards,

Jerry

> On Dec 15, 2015, at 5:18 PM, Jakob Odersky  wrote:
> 
> Hi Veljko,
> I would assume keeping the number of executors per machine to a minimum is 
> best for performance (as long as you consider memory requirements as well).
> Each executor is a process that can run tasks in multiple threads. On a 
> kernel/hardware level, thread switches are much cheaper than process switches 
> and therefore having a single executor with multiple threads gives a better 
> over-all performance that multiple executors with less threads.
> 
> --Jakob
> 
> On 15 December 2015 at 13:07, Veljko Skarich  > wrote:
> Hi, 
> 
> I'm looking for suggestions on the ideal number of executors per machine. I 
> run my jobs on 64G 32 core machines, and at the moment I have one executor 
> running per machine, on the spark standalone cluster.
> 
>  I could not find many guidelines for figuring out the ideal number of 
> executors; the Spark official documentation merely recommends not having more 
> than 64G per executor to avoid GC issues. Anyone have and advice on this?
> 
> thank you. 
> 



Re: spark-ec2 vs. EMR

2015-12-02 Thread Jerry Lam
Hi Dana,

Yes, we get VPC + EMR working but I'm not the person who deploys it. It is
related to subnet as Alex points out.

Just to want to add another point, spark-ec2 is nice to keep and improve
because it allows users to any version of spark (nightly-build for
example). EMR does not allow you to do that without manual process.

Best Regards,

Jerry

On Wed, Dec 2, 2015 at 1:02 PM, Alexander Pivovarov <apivova...@gmail.com>
wrote:

> Do you think it's a security issue if EMR started in VPC with a subnet
> having Auto-assign Public IP: Yes
>
> you can remove all Inbound rules having 0.0.0.0/0 Source in master and
> slave Security Group
> So, master and slave boxes will be accessible only for users who are on VPN
>
>
>
>
> On Wed, Dec 2, 2015 at 9:44 AM, Dana Powers <dana.pow...@gmail.com> wrote:
>
>> EMR was a pain to configure on a private VPC last I tried. Has anyone had
>> success with that? I found spark-ec2 easier to use w private networking,
>> but also agree that I would use for prod.
>>
>> -Dana
>> On Dec 1, 2015 12:29 PM, "Alexander Pivovarov" <apivova...@gmail.com>
>> wrote:
>>
>>> 1. Emr 4.2.0 has Zeppelin as an alternative to DataBricks Notebooks
>>>
>>> 2. Emr has Ganglia 3.6.0
>>>
>>> 3. Emr has hadoop fs settings to make s3 work fast (direct.EmrFileSystem)
>>>
>>> 4. EMR has s3 keys in hadoop configs
>>>
>>> 5. EMR allows to resize cluster on fly.
>>>
>>> 6. EMR has aws sdk in spark classpath. Helps to reduce app assembly jar
>>> size
>>>
>>> 7. ec2 script installs all in /root, EMR has dedicated users: hadoop,
>>> zeppelin, etc. EMR is similar to Cloudera or Hortonworks
>>>
>>> 8. There are at least 3 spark-ec2 projects. (in apache/spark, in mesos,
>>> in amplab). Master branch in spark has outdated ec2 script. Other projects
>>> have broken links in readme. WHAT A MESS!
>>>
>>> 9. ec2 script has bad documentation and non informative error messages.
>>> e.g. readme does not say anything about --private-ips option. If you did
>>> not add the flag it will connect to empty string host (localhost) instead
>>> of master. Fixed only last week. Not sure if fixed in all branches
>>>
>>> 10. I think Amazon will include spark-jobserver to EMR soon.
>>>
>>> 11. You do not need to be aws expert to start EMR cluster. Users can use
>>> EMR web ui to start cluster to run some jobs or work in Zeppelun during the
>>> day
>>>
>>> 12. EMR cluster starts in abour 8 min. Ec2 script works longer and you
>>> need to be online.
>>> On Dec 1, 2015 9:22 AM, "Jerry Lam" <chiling...@gmail.com> wrote:
>>>
>>>> Simply put:
>>>>
>>>> EMR = Hadoop Ecosystem (Yarn, HDFS, etc) + Spark + EMRFS + Amazon EMR
>>>> API + Selected Instance Types + Amazon EC2 Friendly (bootstrapping)
>>>> spark-ec2 = HDFS + Yarn (Optional) + Spark (Standalone Default) + Any
>>>> Instance Type
>>>>
>>>> I use spark-ec2 for prototyping and I have never use it for production.
>>>>
>>>> just my $0.02
>>>>
>>>>
>>>>
>>>> On Dec 1, 2015, at 11:15 AM, Nick Chammas <nicholas.cham...@gmail.com>
>>>> wrote:
>>>>
>>>> Pinging this thread in case anyone has thoughts on the matter they want
>>>> to share.
>>>>
>>>> On Sat, Nov 21, 2015 at 11:32 AM Nicholas Chammas <[hidden email]>
>>>> wrote:
>>>>
>>>>> Spark has come bundled with spark-ec2
>>>>> <http://spark.apache.org/docs/latest/ec2-scripts.html> for many
>>>>> years. At the same time, EMR has been capable of running Spark for a 
>>>>> while,
>>>>> and earlier this year it added "official" support
>>>>> <https://aws.amazon.com/blogs/aws/new-apache-spark-on-amazon-emr/>.
>>>>>
>>>>> If you're looking for a way to provision Spark clusters, there are
>>>>> some clear differences between these 2 options. I think the biggest one
>>>>> would be that EMR is a "production" solution backed by a company, whereas
>>>>> spark-ec2 is not really intended for production use (as far as I know).
>>>>>
>>>>> That particular difference in intended use may or may not matter to
>>>>> you, but I'm curious:
>>>>>
>>>>> What are some of the other differences between the 2 that do matter to
>>>>> you? If you were considering these 2 solutions for your use case at one
>>>>> point recently, why did you choose one over the other?
>>>>>
>>>>> I'd be especially interested in hearing about why people might choose
>>>>> spark-ec2 over EMR, since the latter option seems to have shaped up nicely
>>>>> this year.
>>>>>
>>>>> Nick
>>>>>
>>>>>
>>>> --
>>>> View this message in context: Re: spark-ec2 vs. EMR
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/Re-spark-ec2-vs-EMR-tp25538.html>
>>>> Sent from the Apache Spark User List mailing list archive
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>
>>>>
>>>>
>


Re: Very slow startup for jobs containing millions of tasks

2015-11-14 Thread Jerry Lam
Hi Ted, 

That looks exactly what happens. It has been 5 hrs now. The code was built for 
1.4. Thank you very much! 

Best Regards,

Jerry

Sent from my iPhone

> On 14 Nov, 2015, at 11:21 pm, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> Which release are you using ?
> If older than 1.5.0, you miss some fixes such as SPARK-9952
> 
> Cheers
> 
>> On Sat, Nov 14, 2015 at 6:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>> Hi spark users and developers,
>> 
>> Have anyone experience the slow startup of a job when it contains a stage 
>> with over 4 millions of tasks? 
>> The job has been pending for 1.4 hours without doing anything (please refer 
>> to the attached pictures). However, the driver is busy doing something. 
>> jstack the driver and I found the following relevant:
>> 
>> ```
>> "dag-scheduler-event-loop" daemon prio=10 tid=0x7f24a8c59800 nid=0x454 
>> runnable [0x7f23b3e29000]
>>java.lang.Thread.State: RUNNABLE
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>> at 
>> org.apache.spark.rdd.RDD$$anonfun$preferredLocations$2.apply(RDD.scala:231)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.rdd.RDD.preferredLocations(RDD.scala:230)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1399)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.getPreferredLocs(DAGScheduler.scala:1373)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:911)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$16.apply(DAGScheduler.scala:910)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at 
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:910)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:834)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:837)
>> at 
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$submitStage$4.apply(DAGScheduler.scala:836)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:836)
>> at 
>> org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:818)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1453)
>> at 
>> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1445)
>> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
>> ```
>> 
>> It seems that it takes long time for the driver to create/schedule the DAG 
>> for that many tasks. Is there a way to speed it up? 
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi Zhan,

Thank you for providing a workaround! 
I will try this out but I agree with Ted, there should be a better way to 
capture the exception and handle it by just initializing SQLContext instead of 
HiveContext. WARN the user that something is wrong with his hive setup. 

Having spark.sql.hive.enabled false configuration would be lovely too. :)
Just an additional bonus is that it requires less memory if we don’t use 
HiveContext on the driver side (~100-200MB) from a rough observation. 

Thanks and have a nice weekend!

Jerry


> On Nov 6, 2015, at 5:53 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> I would suggest adding a config parameter that allows bypassing 
> initialization of HiveContext in case of SQLException
> 
> Cheers
> 
> On Fri, Nov 6, 2015 at 2:50 PM, Zhan Zhang <zzh...@hortonworks.com 
> <mailto:zzh...@hortonworks.com>> wrote:
> Hi Jerry,
> 
> OK. Here is an ugly walk around.
> 
> Put a hive-site.xml under $SPARK_HOME/conf with invalid content. You will get 
> a bunch of exceptions because hive context initialization failure, but you 
> can initialize your SQLContext on your own.
> 
> scala>  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> sqlContext: org.apache.spark.sql.SQLContext = 
> org.apache.spark.sql.SQLContext@4a5cc2e8
> 
> scala> import sqlContext.implicits._
> import sqlContext.implicits._
> 
> 
> for example
> 
> HW11188:spark zzhang$ more conf/hive-site.xml
> 
> 
>  
> 
>
> 
>   hive.metastore.uris
> thrift://zzhang-yarn11:9083 <>
> 
>
> 
>  
> HW11188:spark zzhang$
> 
> By the way, I don’t know whether there is any caveat for this walk around.
> 
> Thanks.
> 
> Zhan Zhang
> 
> 
> 
> 
> 
> On Nov 6, 2015, at 2:40 PM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> 
>> Hi Zhan,
>> 
>> I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
>> sexier and much less typo. :)
>> Also, HiveContext requires metastore database setup (derby by default). The 
>> problem is that I cannot have 2 spark-shell sessions running at the same 
>> time in the same host (e.g. /home/jerry directory). It will give me an 
>> exception like below. 
>> 
>> Since I don’t use HiveContext, I don’t see the need to maintain a database. 
>> 
>> What is interesting is that pyspark shell is able to start more than 1 
>> session at the same time. I wonder what pyspark has done better than 
>> spark-shell?
>> 
>> Best Regards,
>> 
>> Jerry
>> 
>>> On Nov 6, 2015, at 5:28 PM, Zhan Zhang <zzh...@hortonworks.com 
>>> <mailto:zzh...@hortonworks.com>> wrote:
>>> 
>>> If you assembly jar have hive jar included, the HiveContext will be used. 
>>> Typically, HiveContext has more functionality than SQLContext. In what case 
>>> you have to use SQLContext that cannot be done by HiveContext?
>>> 
>>> Thanks.
>>> 
>>> Zhan Zhang
>>> 
>>> On Nov 6, 2015, at 10:43 AM, Jerry Lam <chiling...@gmail.com 
>>> <mailto:chiling...@gmail.com>> wrote:
>>> 
>>>> What is interesting is that pyspark shell works fine with multiple session 
>>>> in the same host even though multiple HiveContext has been created. What 
>>>> does pyspark does differently in terms of starting up the shell?
>>>> 
>>>>> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com 
>>>>> <mailto:yuzhih...@gmail.com>> wrote:
>>>>> 
>>>>> In SQLContext.scala :
>>>>> // After we have populated SQLConf, we call setConf to populate other 
>>>>> confs in the subclass
>>>>> // (e.g. hiveconf in HiveContext).
>>>>> properties.foreach {
>>>>>   case (key, value) => setConf(key, value)
>>>>> }
>>>>> 
>>>>> I don't see config of skipping the above call.
>>>>> 
>>>>> FYI
>>>>> 
>>>>> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
>>>>> <mailto:chiling...@gmail.com>> wrote:
>>>>> Hi spark users and developers,
>>>>> 
>>>>> Is it possible to disable HiveContext from being instantiated when using 
>>>>> spark-shell? I got the following errors when I have more than one session 
>>>>> starts. Since I don't use HiveContext, it would be great if I ca

Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi Zhan,

I don’t use HiveContext features at all. I use mostly DataFrame API. It is 
sexier and much less typo. :)
Also, HiveContext requires metastore database setup (derby by default). The 
problem is that I cannot have 2 spark-shell sessions running at the same time 
in the same host (e.g. /home/jerry directory). It will give me an exception 
like below. 

Since I don’t use HiveContext, I don’t see the need to maintain a database. 

What is interesting is that pyspark shell is able to start more than 1 session 
at the same time. I wonder what pyspark has done better than spark-shell?

Best Regards,

Jerry

> On Nov 6, 2015, at 5:28 PM, Zhan Zhang <zzh...@hortonworks.com> wrote:
> 
> If you assembly jar have hive jar included, the HiveContext will be used. 
> Typically, HiveContext has more functionality than SQLContext. In what case 
> you have to use SQLContext that cannot be done by HiveContext?
> 
> Thanks.
> 
> Zhan Zhang
> 
> On Nov 6, 2015, at 10:43 AM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> 
>> What is interesting is that pyspark shell works fine with multiple session 
>> in the same host even though multiple HiveContext has been created. What 
>> does pyspark does differently in terms of starting up the shell?
>> 
>>> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com 
>>> <mailto:yuzhih...@gmail.com>> wrote:
>>> 
>>> In SQLContext.scala :
>>> // After we have populated SQLConf, we call setConf to populate other 
>>> confs in the subclass
>>> // (e.g. hiveconf in HiveContext).
>>> properties.foreach {
>>>   case (key, value) => setConf(key, value)
>>> }
>>> 
>>> I don't see config of skipping the above call.
>>> 
>>> FYI
>>> 
>>> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
>>> <mailto:chiling...@gmail.com>> wrote:
>>> Hi spark users and developers,
>>> 
>>> Is it possible to disable HiveContext from being instantiated when using 
>>> spark-shell? I got the following errors when I have more than one session 
>>> starts. Since I don't use HiveContext, it would be great if I can have more 
>>> than 1 spark-shell start at the same time. 
>>> 
>>> Exception in thread "main" java.lang.RuntimeException: 
>>> java.lang.RuntimeException: Unable to instantiate 
>>> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
>>> toreClient
>>> at 
>>> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
>>> at 
>>> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
>>> Method)
>>> at 
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
>>> at 
>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
>>> at 
>>> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
>>> at 
>>> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
>>> at 
>>> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
>>> at 
>>> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
>>> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>>> at 
>>> scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>> at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
>>> at 
>>> org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
>>> Method)
>>> at 
>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstruct

Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi Ted,

I was trying to set spark.sql.dialect to sql as to specify I only need 
“SQLContext” not HiveContext. It didn’t work. It still instantiate HiveContext. 
Since I don’t use HiveContext and I don’t want to start a mysql database 
because I want to have more than 1 session of spark-shell simultaneously. Is 
there an easy way to get around it? More exception here:

Caused by: java.sql.SQLException: Unable to open a test connection to the given 
database. JDBC url = jdbc:derby:;databaseName=metastore_db;create=true, 
username = APP. T
erminating connection pool (set lazyInit to true if you expect to start your 
database after your app). Original Exception: --^M
java.sql.SQLException: Failed to start database 'metastore_db' with class 
loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@53a39109, 
see the next exc
eption for details.
at 
org.apache.derby.impl.jdbc.SQLExceptionFactory40.getSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.newEmbedSQLException(Unknown Source)
at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown 
Source)
at org.apache.derby.impl.jdbc.EmbedConnection.(Unknown Source)
at org.apache.derby.impl.jdbc.EmbedConnection40.(Unknown Source)
at org.apache.derby.jdbc.Driver40.getNewEmbedConnection(Unknown Source)
at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
at org.apache.derby.jdbc.Driver20.connect(Unknown Source)
at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
at java.sql.DriverManager.getConnection(DriverManager.java:571)

Best Regards,

Jerry

> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> In SQLContext.scala :
> // After we have populated SQLConf, we call setConf to populate other 
> confs in the subclass
> // (e.g. hiveconf in HiveContext).
> properties.foreach {
>   case (key, value) => setConf(key, value)
> }
> 
> I don't see config of skipping the above call.
> 
> FYI
> 
> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> Hi spark users and developers,
> 
> Is it possible to disable HiveContext from being instantiated when using 
> spark-shell? I got the following errors when I have more than one session 
> starts. Since I don't use HiveContext, it would be great if I can have more 
> than 1 spark-shell start at the same time. 
> 
> Exception in thread "main" java.lang.RuntimeException: 
> java.lang.RuntimeException: Unable to instantiate 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
> toreClient
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
> at 
> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
> at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.refl

[Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
Hi spark users and developers,

Is it possible to disable HiveContext from being instantiated when using
spark-shell? I got the following errors when I have more than one session
starts. Since I don't use HiveContext, it would be great if I can have more
than 1 spark-shell start at the same time.

Exception in thread "main" java.lang.RuntimeException:
java.lang.RuntimeException: Unable to instantiate
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
toreClient
at
org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
at
org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
at
org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
at
org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
at
org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
at
org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
at
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
at
org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
at
org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at
org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
at
org.apache.spark.repl.SparkILoopExt.importSpark(SparkILoopExt.scala:154)
at
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply$mcZ$sp(SparkILoopExt.scala:127)
at
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
at
org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)

Best Regards,

Jerry


Re: [Spark-SQL]: Disable HiveContext from instantiating in spark-shell

2015-11-06 Thread Jerry Lam
What is interesting is that pyspark shell works fine with multiple session in 
the same host even though multiple HiveContext has been created. What does 
pyspark does differently in terms of starting up the shell?

> On Nov 6, 2015, at 12:12 PM, Ted Yu <yuzhih...@gmail.com> wrote:
> 
> In SQLContext.scala :
> // After we have populated SQLConf, we call setConf to populate other 
> confs in the subclass
> // (e.g. hiveconf in HiveContext).
> properties.foreach {
>   case (key, value) => setConf(key, value)
> }
> 
> I don't see config of skipping the above call.
> 
> FYI
> 
> On Fri, Nov 6, 2015 at 8:53 AM, Jerry Lam <chiling...@gmail.com 
> <mailto:chiling...@gmail.com>> wrote:
> Hi spark users and developers,
> 
> Is it possible to disable HiveContext from being instantiated when using 
> spark-shell? I got the following errors when I have more than one session 
> starts. Since I don't use HiveContext, it would be great if I can have more 
> than 1 spark-shell start at the same time. 
> 
> Exception in thread "main" java.lang.RuntimeException: 
> java.lang.RuntimeException: Unable to instantiate 
> org.apache.hadoop.hive.ql.metadata.SessionHiveMetaS
> toreClient
> at 
> org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:522)
> at 
> org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:171)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.liftedTree1$1(IsolatedClientLoader.scala:183)
> at 
> org.apache.spark.sql.hive.client.IsolatedClientLoader.(IsolatedClientLoader.scala:179)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:226)
> at 
> org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:185)
> at 
> org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:392)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:235)
> at 
> org.apache.spark.sql.SQLContext$$anonfun$5.apply(SQLContext.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:727)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at org.apache.spark.sql.SQLContext.(SQLContext.scala:234)
> at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:72)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at 
> org.apache.spark.repl.SparkILoop.createSQLContext(SparkILoop.scala:1028)
> at 
> org.apache.spark.repl.SparkILoopExt.importSpark(SparkILoopExt.scala:154)
> at 
> org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply$mcZ$sp(SparkILoopExt.scala:127)
> at 
> org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
> at 
> org.apache.spark.repl.SparkILoopExt$$anonfun$process$1.apply(SparkILoopExt.scala:113)
> 
> Best Regards,
> 
> Jerry
> 



Re: Spark EC2 script on Large clusters

2015-11-05 Thread Jerry Lam
Does Qubole use Yarn or Mesos for resource management?

Sent from my iPhone

> On 5 Nov, 2015, at 9:02 pm, Sabarish Sasidharan 
>  wrote:
> 
> Qubole

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Please reply if you use Mesos fine grained mode

2015-11-03 Thread Jerry Lam
We "used" Spark on Mesos to build interactive data analysis platform
because the interactive session could be long and might not use Spark for
the entire session. It is very wasteful of resources if we used the
coarse-grained mode because it keeps resource for the entire session.
Therefore, fine-grained mode was used.

Knowing that Spark now supports dynamic resource allocation with coarse
grained mode, we were thinking about using it. However, we decided to
switch to Yarn because in addition to dynamic allocation, it has better
supports on security.

On Tue, Nov 3, 2015 at 7:22 PM, Soren Macbeth  wrote:

> we use fine-grained mode. coarse-grained mode keeps JVMs around which
> often leads to OOMs, which in turn kill the entire executor, causing entire
> stages to be retried. In fine-grained mode, only the task fails and
> subsequently gets retried without taking out an entire stage or worse.
>
> On Tue, Nov 3, 2015 at 3:54 PM, Reynold Xin  wrote:
>
>> If you are using Spark with Mesos fine grained mode, can you please
>> respond to this email explaining why you use it over the coarse grained
>> mode?
>>
>> Thanks.
>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
I agreed the max date will satisfy the latest date requirement but it does
not satisfy the second last date requirement you mentioned.

Just for your information, before you invested in the partitioned table too
much, I want to warn you that it has memory issues (both on executors and
driver side). A simple experiment can show that if you have over 10 years
of date (3650 directories), it takes a long time to initialize. I got to
know the limitation after I tried to partition user events per their
user_id. It was a disaster (>1 user_id).

I hope the spark developer can address the memory limitations because
partitioned table is very useful in many cases.

Cheers ~



On Sun, Nov 1, 2015 at 4:39 PM, Koert Kuipers <ko...@tresata.com> wrote:

> i was going for the distinct approach, since i want it to be general
> enough to also solve other related problems later. the max-date is likely
> to be faster though.
>
> On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Koert,
>>
>> You should be able to see if it requires scanning the whole data by
>> "explain" the query. The physical plan should say something about it. I
>> wonder if you are trying the distinct-sort-by-limit approach or the
>> max-date approach?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>
>>> it seems pretty fast, but if i have 2 partitions and 10mm records i do
>>> have to dedupe (distinct) 10mm records
>>>
>>> a direct way to just find out what the 2 partitions are would be much
>>> faster. spark knows it, but its not exposed.
>>>
>>> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> it seems to work but i am not sure if its not scanning the whole
>>>> dataset. let me dig into tasks a a bit
>>>>
>>>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>>
>>>>> Hi Koert,
>>>>>
>>>>> If the partitioned table is implemented properly, I would think
>>>>> "select distinct(date) as dt from table order by dt DESC limit 1" would
>>>>> return the latest dates without scanning the whole dataset. I haven't try
>>>>> it that myself. It would be great if you can report back if this actually
>>>>> works or not. :)
>>>>>
>>>>> Best Regards,
>>>>>
>>>>> Jerry
>>>>>
>>>>>
>>>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com>
>>>>> wrote:
>>>>>
>>>>>> hello all,
>>>>>> i am trying to get familiar with spark sql partitioning support.
>>>>>>
>>>>>> my data is partitioned by date, so like this:
>>>>>> data/date=2015-01-01
>>>>>> data/date=2015-01-02
>>>>>> data/date=2015-01-03
>>>>>> ...
>>>>>>
>>>>>> lets say i would like a batch process to read data for the latest
>>>>>> date only. how do i proceed?
>>>>>> generally the latest date will be yesterday, but it could be a day
>>>>>> older or maybe 2.
>>>>>>
>>>>>> i understand that i will have to do something like:
>>>>>> df.filter(df("date") === some_date_string_here)
>>>>>>
>>>>>> however i do now know what some_date_string_here should be. i would
>>>>>> like to inspect the available dates and pick the latest. is there an
>>>>>> efficient way to  find out what the available partitions are?
>>>>>>
>>>>>> thanks! koert
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

You should be able to see if it requires scanning the whole data by
"explain" the query. The physical plan should say something about it. I
wonder if you are trying the distinct-sort-by-limit approach or the
max-date approach?

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers <ko...@tresata.com> wrote:

> it seems pretty fast, but if i have 2 partitions and 10mm records i do
> have to dedupe (distinct) 10mm records
>
> a direct way to just find out what the 2 partitions are would be much
> faster. spark knows it, but its not exposed.
>
> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> it seems to work but i am not sure if its not scanning the whole dataset.
>> let me dig into tasks a a bit
>>
>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Koert,
>>>
>>> If the partitioned table is implemented properly, I would think "select
>>> distinct(date) as dt from table order by dt DESC limit 1" would return the
>>> latest dates without scanning the whole dataset. I haven't try it that
>>> myself. It would be great if you can report back if this actually works or
>>> not. :)
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> hello all,
>>>> i am trying to get familiar with spark sql partitioning support.
>>>>
>>>> my data is partitioned by date, so like this:
>>>> data/date=2015-01-01
>>>> data/date=2015-01-02
>>>> data/date=2015-01-03
>>>> ...
>>>>
>>>> lets say i would like a batch process to read data for the latest date
>>>> only. how do i proceed?
>>>> generally the latest date will be yesterday, but it could be a day
>>>> older or maybe 2.
>>>>
>>>> i understand that i will have to do something like:
>>>> df.filter(df("date") === some_date_string_here)
>>>>
>>>> however i do now know what some_date_string_here should be. i would
>>>> like to inspect the available dates and pick the latest. is there an
>>>> efficient way to  find out what the available partitions are?
>>>>
>>>> thanks! koert
>>>>
>>>>
>>>>
>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

the physical plan looks like it is doing the right thing:

partitioned table hdfs://user/koert/test, read date from the directory
names, hash partitioned and agg the date to find distinct date. Finally
shuffle the dates for sort and limit 1 operations.

This is my understanding of the physical plan, you can navigate the actual
execution in the web UI to see how much data is actually read to satisfy
this request. I hope it only requires a few bytes for few dates.

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 5:56 PM, Jerry Lam <chiling...@gmail.com> wrote:

> I agreed the max date will satisfy the latest date requirement but it does
> not satisfy the second last date requirement you mentioned.
>
> Just for your information, before you invested in the partitioned table
> too much, I want to warn you that it has memory issues (both on executors
> and driver side). A simple experiment can show that if you have over 10
> years of date (3650 directories), it takes a long time to initialize. I got
> to know the limitation after I tried to partition user events per their
> user_id. It was a disaster (>1 user_id).
>
> I hope the spark developer can address the memory limitations because
> partitioned table is very useful in many cases.
>
> Cheers ~
>
>
>
> On Sun, Nov 1, 2015 at 4:39 PM, Koert Kuipers <ko...@tresata.com> wrote:
>
>> i was going for the distinct approach, since i want it to be general
>> enough to also solve other related problems later. the max-date is likely
>> to be faster though.
>>
>> On Sun, Nov 1, 2015 at 4:36 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Koert,
>>>
>>> You should be able to see if it requires scanning the whole data by
>>> "explain" the query. The physical plan should say something about it. I
>>> wonder if you are trying the distinct-sort-by-limit approach or the
>>> max-date approach?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sun, Nov 1, 2015 at 4:25 PM, Koert Kuipers <ko...@tresata.com> wrote:
>>>
>>>> it seems pretty fast, but if i have 2 partitions and 10mm records i do
>>>> have to dedupe (distinct) 10mm records
>>>>
>>>> a direct way to just find out what the 2 partitions are would be much
>>>> faster. spark knows it, but its not exposed.
>>>>
>>>> On Sun, Nov 1, 2015 at 4:08 PM, Koert Kuipers <ko...@tresata.com>
>>>> wrote:
>>>>
>>>>> it seems to work but i am not sure if its not scanning the whole
>>>>> dataset. let me dig into tasks a a bit
>>>>>
>>>>> On Sun, Nov 1, 2015 at 3:18 PM, Jerry Lam <chiling...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Koert,
>>>>>>
>>>>>> If the partitioned table is implemented properly, I would think
>>>>>> "select distinct(date) as dt from table order by dt DESC limit 1" would
>>>>>> return the latest dates without scanning the whole dataset. I haven't try
>>>>>> it that myself. It would be great if you can report back if this actually
>>>>>> works or not. :)
>>>>>>
>>>>>> Best Regards,
>>>>>>
>>>>>> Jerry
>>>>>>
>>>>>>
>>>>>> On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers <ko...@tresata.com>
>>>>>> wrote:
>>>>>>
>>>>>>> hello all,
>>>>>>> i am trying to get familiar with spark sql partitioning support.
>>>>>>>
>>>>>>> my data is partitioned by date, so like this:
>>>>>>> data/date=2015-01-01
>>>>>>> data/date=2015-01-02
>>>>>>> data/date=2015-01-03
>>>>>>> ...
>>>>>>>
>>>>>>> lets say i would like a batch process to read data for the latest
>>>>>>> date only. how do i proceed?
>>>>>>> generally the latest date will be yesterday, but it could be a day
>>>>>>> older or maybe 2.
>>>>>>>
>>>>>>> i understand that i will have to do something like:
>>>>>>> df.filter(df("date") === some_date_string_here)
>>>>>>>
>>>>>>> however i do now know what some_date_string_here should be. i would
>>>>>>> like to inspect the available dates and pick the latest. is there an
>>>>>>> efficient way to  find out what the available partitions are?
>>>>>>>
>>>>>>> thanks! koert
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


Re: spark sql partitioned by date... read last date

2015-11-01 Thread Jerry Lam
Hi Koert,

If the partitioned table is implemented properly, I would think "select
distinct(date) as dt from table order by dt DESC limit 1" would return the
latest dates without scanning the whole dataset. I haven't try it that
myself. It would be great if you can report back if this actually works or
not. :)

Best Regards,

Jerry


On Sun, Nov 1, 2015 at 3:03 PM, Koert Kuipers  wrote:

> hello all,
> i am trying to get familiar with spark sql partitioning support.
>
> my data is partitioned by date, so like this:
> data/date=2015-01-01
> data/date=2015-01-02
> data/date=2015-01-03
> ...
>
> lets say i would like a batch process to read data for the latest date
> only. how do i proceed?
> generally the latest date will be yesterday, but it could be a day older
> or maybe 2.
>
> i understand that i will have to do something like:
> df.filter(df("date") === some_date_string_here)
>
> however i do now know what some_date_string_here should be. i would like
> to inspect the available dates and pick the latest. is there an efficient
> way to  find out what the available partitions are?
>
> thanks! koert
>
>
>


Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Jerry Lam
Hi Bryan,

Did you read the email I sent few days ago. There are more issues with 
partitionBy down the road: 
https://www.mail-archive.com/user@spark.apache.org/msg39512.html 


Best Regards,

Jerry

> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey  wrote:
> 
> The second issue I'm seeing is an OOM issue when writing partitioned data.  I 
> am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive libraries 
> packaged with Spark.  Spark was compiled using the following:  mvn 
> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
> -Phive-thriftserver package
> 
> Given a case class like the following:
> 
> case class HiveWindowsEvent(
>  targetEntity: String,
>  targetEntityType: String,
>  dateTimeUtc: Timestamp,
>  eventid: String,
>  eventData: Map[String, String],
>  description: String,
>  eventRecordId: String,
>  level: String,
>  machineName: String,
>  sequenceNumber: String,
>  source: String,
>  sourceMachineName: String,
>  taskCategory: String,
>  user: String,
>  machineIp: String,
>  additionalData: Map[String, String],
>  windowseventtimebin: Long
>  )
> 
> The command to write data works fine (and when queried via Beeline data is 
> correct):
> 
> val hc = new HiveContext(sc)
> import hc.implicits._
> 
> val partitioner = new HashPartitioner(5)
> hiveWindowsEvents.foreachRDD(rdd => {
>   val eventsDF = rdd.toDF()
>   eventsDF
> .write
> .mode(SaveMode.Append).saveAsTable("windows_event9")
> })
> 
> Once I add the partitioning (few partitions - three or less):
> 
> val hc = new HiveContext(sc)
> import hc.implicits._
> 
> val partitioner = new HashPartitioner(5)
> hiveWindowsEvents.foreachRDD(rdd => {
>   val eventsDF = rdd.toDF()
>   eventsDF
> .write
> .partitionBy("windowseventtimebin")
> .mode(SaveMode.Append).saveAsTable("windows_event9")
> })
> 
> I see the following error when writing to (3) partitions:
> 
> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
> at org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org 
> $apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
> at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at 
> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: Java heap space
> at 
> parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
> at 
> parquet.bytes.CapacityByteArrayOutputStream.(CapacityByteArrayOutputStream.java:57)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:68)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.(ColumnChunkPageWriteStore.java:48)
> at 
> parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
> at 
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
> at 
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
> at 
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.(MessageColumnIO.java:178)
> at 
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
> at 
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
> at 
> parquet.hadoop.InternalParquetRecordWriter.(InternalParquetRecordWriter.java:94)
> at 
> parquet.hadoop.ParquetRecordWriter.(ParquetRecordWriter.java:64)
> at 
> 

Re: Spark -- Writing to Partitioned Persistent Table

2015-10-28 Thread Jerry Lam
Hi Bryan,

I think they fixed some memory issues in 1.4 for the partition table 
implementation. 1.5 does much better in terms of executor memory usage for 
generating partition tables. However, if your table has over some thousand of 
partitions, reading the partition could be challenging. it takes awhile to 
initialize the partition table and it requires a lot of memory from the driver. 
I would not use it if the number of partition go over a few hundreds. 

Hope this help,

Jerry

Sent from my iPhone

> On 28 Oct, 2015, at 6:33 pm, Bryan <bryan.jeff...@gmail.com> wrote:
> 
> Jerry,
> 
> Thank you for the note. It sounds like you were able to get further than I 
> have been - any insight? Just a Spark 1.4.1 vs Spark 1.5?
> 
> Regards,
> 
> Bryan Jeffrey
> From: Jerry Lam
> Sent: ‎10/‎28/‎2015 6:29 PM
> To: Bryan Jeffrey
> Cc: Susan Zhang; user
> Subject: Re: Spark -- Writing to Partitioned Persistent Table
> 
> Hi Bryan,
> 
> Did you read the email I sent few days ago. There are more issues with 
> partitionBy down the road: 
> https://www.mail-archive.com/user@spark.apache.org/msg39512.html
> 
> Best Regards,
> 
> Jerry
> 
>> On Oct 28, 2015, at 4:52 PM, Bryan Jeffrey <bryan.jeff...@gmail.com> wrote:
>> 
>> The second issue I'm seeing is an OOM issue when writing partitioned data.  
>> I am running Spark 1.4.1, Scala 2.11, Hadoop 2.6.1 & using the Hive 
>> libraries packaged with Spark.  Spark was compiled using the following:  mvn 
>> -Dhadoop.version=2.6.1 -Dscala-2.11 -DskipTests -Pyarn -Phive 
>> -Phive-thriftserver package
>> 
>> Given a case class like the following:
>> 
>> case class HiveWindowsEvent(
>>  targetEntity: String,
>>  targetEntityType: String,
>>  dateTimeUtc: Timestamp,
>>  eventid: String,
>>  eventData: Map[String, String],
>>  description: String,
>>  eventRecordId: String,
>>  level: String,
>>  machineName: String,
>>  sequenceNumber: String,
>>  source: String,
>>  sourceMachineName: String,
>>  taskCategory: String,
>>  user: String,
>>  machineIp: String,
>>  additionalData: Map[String, String],
>>  windowseventtimebin: Long
>>  )
>> 
>> The command to write data works fine (and when queried via Beeline data is 
>> correct):
>> 
>> val hc = new HiveContext(sc)
>> import hc.implicits._
>> 
>> val partitioner = new HashPartitioner(5)
>> hiveWindowsEvents.foreachRDD(rdd => {
>>   val eventsDF = rdd.toDF()
>>   eventsDF
>> .write
>> .mode(SaveMode.Append).saveAsTable("windows_event9")
>> })
>> 
>> Once I add the partitioning (few partitions - three or less):
>> 
>> val hc = new HiveContext(sc)
>> import hc.implicits._
>> 
>> val partitioner = new HashPartitioner(5)
>> hiveWindowsEvents.foreachRDD(rdd => {
>>   val eventsDF = rdd.toDF()
>>   eventsDF
>> .write
>> .partitionBy("windowseventtimebin")
>> .mode(SaveMode.Append).saveAsTable("windows_event9")
>> })
>> 
>> I see the following error when writing to (3) partitions:
>> 
>> 15/10/28 20:23:01 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 
>> 10.0.0.6): org.apache.spark.SparkException: Task failed while writing rows.
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$2(commands.scala:270)
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>> at 
>> org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insertWithDynamicPartitions$3.apply(commands.scala:229)
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
>> at org.apache.spark.scheduler.Task.run(Task.scala:70)
>> at 
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
>> at 
>> java.util.concurrent.ThreadPoolExecutor.ru

[Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Jerry Lam
Hi Spark users and developers,

Anyone experiences issues in setting hadoop configurations after
SparkContext is initialized? I'm using Spark 1.5.1.

I'm trying to use s3a which requires access and secret key set into hadoop
configuration. I tried to set the properties in the hadoop configuration
from sparktcontext.

sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)

val sqlContext = new SQLContext(sc)
val df = sqlContext.read.parquet("s3a://parquetfiles")

So far so good, I saw a job has been submitted to get the parquet schema
and it returns successfully.

and then I tried to do:

df.count

This failed with AmazonClientException:

com.amazonaws.AmazonClientException: Unable to load AWS credentials from
any provider in the chain
at
com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3521)
at
com.amazonaws.services.s3.AmazonS3Client.headBucket(AmazonS3Client.java:1031)
at
com.amazonaws.services.s3.AmazonS3Client.doesBucketExist(AmazonS3Client.java:994)
at org.apache.hadoop.fs.s3a.S3AFileSystem.initialize(S3AFileSystem.java:297)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:384)
at
org.apache.parquet.hadoop.ParquetRecordReader.initializeInternalReader(ParquetRecordReader.java:157)
at
org.apache.parquet.hadoop.ParquetRecordReader.initialize(ParquetRecordReader.java:140)
at
org.apache.spark.rdd.SqlNewHadoopRDD$$anon$1.(SqlNewHadoopRDD.scala:155)
at org.apache.spark.rdd.SqlNewHadoopRDD.compute(SqlNewHadoopRDD.scala:120)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

Any idea why it can read the schema from the parquet file but not
processing the file? It feels like the hadoop configuration is not sent to
the executor for some reasons...

Thanks,

Jerry


Re: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Jerry Lam
Hi Marcelo,

Thanks for the advice. I understand that we could set the configurations
before creating SparkContext. My question is
SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
propagate to all subsequent SQLContext jobs. Note that I mentioned I can
load the parquet file but I cannot perform a count on the parquet file
because of the AmazonClientException. It means that the credential is used
during the loading of the parquet but not when we are processing the
parquet file. How this can happen?

Best Regards,

Jerry


On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam <chiling...@gmail.com> wrote:
> > Anyone experiences issues in setting hadoop configurations after
> > SparkContext is initialized? I'm using Spark 1.5.1.
> >
> > I'm trying to use s3a which requires access and secret key set into
> hadoop
> > configuration. I tried to set the properties in the hadoop configuration
> > from sparktcontext.
> >
> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
>
> Try setting "spark.hadoop.fs.s3a.access.key" and
> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
> SparkContext.
>
> --
> Marcelo
>


Re: [Spark-SQL]: Unable to propagate hadoop configuration after SparkContext is initialized

2015-10-27 Thread Jerry Lam
Hi Marcelo,

I tried setting the properties before instantiating spark context via
SparkConf. It works fine.
Originally, the code I have read hadoop configurations from hdfs-site.xml
which works perfectly fine as well.
Therefore, can I conclude that sparkContext.hadoopConfiguration.set("key",
"value") does not propagate through all SQL jobs within the same
SparkContext? I haven't try with Spark Core so I cannot tell.

Is there a workaround given it seems to be broken? I need to do this
programmatically after the SparkContext is instantiated not before...

Best Regards,

Jerry

On Tue, Oct 27, 2015 at 2:30 PM, Marcelo Vanzin <van...@cloudera.com> wrote:

> If setting the values in SparkConf works, there's probably some bug in
> the SQL code; e.g. creating a new Configuration object instead of
> using the one in SparkContext. But I'm not really familiar with that
> code.
>
> On Tue, Oct 27, 2015 at 11:22 AM, Jerry Lam <chiling...@gmail.com> wrote:
> > Hi Marcelo,
> >
> > Thanks for the advice. I understand that we could set the configurations
> > before creating SparkContext. My question is
> > SparkContext.hadoopConfiguration.set("key","value") doesn't seem to
> > propagate to all subsequent SQLContext jobs. Note that I mentioned I can
> > load the parquet file but I cannot perform a count on the parquet file
> > because of the AmazonClientException. It means that the credential is
> used
> > during the loading of the parquet but not when we are processing the
> parquet
> > file. How this can happen?
> >
> > Best Regards,
> >
> > Jerry
> >
> >
> > On Tue, Oct 27, 2015 at 2:05 PM, Marcelo Vanzin <van...@cloudera.com>
> wrote:
> >>
> >> On Tue, Oct 27, 2015 at 10:43 AM, Jerry Lam <chiling...@gmail.com>
> wrote:
> >> > Anyone experiences issues in setting hadoop configurations after
> >> > SparkContext is initialized? I'm using Spark 1.5.1.
> >> >
> >> > I'm trying to use s3a which requires access and secret key set into
> >> > hadoop
> >> > configuration. I tried to set the properties in the hadoop
> configuration
> >> > from sparktcontext.
> >> >
> >> > sc.hadoopConfiguration.set("fs.s3a.access.key", AWSAccessKeyId)
> >> > sc.hadoopConfiguration.set("fs.s3a.secret.key", AWSSecretKey)
> >>
> >> Try setting "spark.hadoop.fs.s3a.access.key" and
> >> "spark.hadoop.fs.s3a.secret.key" in your SparkConf before creating the
> >> SparkContext.
> >>
> >> --
> >> Marcelo
> >
> >
>
>
>
> --
> Marcelo
>


Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-26 Thread Jerry Lam
Hi Fengdong,

Why it needs more memory at the driver side when there are many partitions? It 
seems the implementation can only support use cases for a dozen of partition 
when it is over 100, it fails apart. It is also quite slow to initialize the 
loading of partition tables when the number of partition is over 100. 

Best Regards,

Jerry

Sent from my iPhone

> On 26 Oct, 2015, at 2:50 am, Fengdong Yu <fengdo...@everstring.com> wrote:
> 
> How many partitions you generated?
> if Millions generated, then there is a huge memory consumed.
> 
> 
> 
> 
> 
>> On Oct 26, 2015, at 10:58 AM, Jerry Lam <chiling...@gmail.com> wrote:
>> 
>> Hi guys,
>> 
>> I mentioned that the partitions are generated so I tried to read the 
>> partition data from it. The driver is OOM after few minutes. The stack trace 
>> is below. It looks very similar to the the jstack above (note on the refresh 
>> method). Thanks!
>> 
>> Name: java.lang.OutOfMemoryError
>> Message: GC overhead limit exceeded
>> StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
>> java.lang.StringBuilder.append(StringBuilder.java:132)
>> org.apache.hadoop.fs.Path.toString(Path.java:384)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>> scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
>> org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
>> scala.Option.getOrElse(Option.scala:120)
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
>> org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
>> org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
>> org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
>> org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)
>> 
>>> On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>> Hi Josh,
>>> 
>>> No I don't have speculation enabled. The driver took about few hours until 
>>> it was OOM. Interestingly, all partitions are generated successfully 
>>> (_SUCCESS file is written in the output directory). Is there a reason why 
>>> the driver needs so much memory? The jstack revealed that it called refresh 
>>> some file statuses. Is there a way to avoid OutputCommitCoordinator to use 
>>> so much memory? 
>>> 
>>> Ultimately, I choose to use partitions because most of the queries I have 
>>> will execute based the partition fiel

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi Josh,

No I don't have speculation enabled. The driver took about few hours until
it was OOM. Interestingly, all partitions are generated successfully
(_SUCCESS file is written in the output directory). Is there a reason why
the driver needs so much memory? The jstack revealed that it called refresh
some file statuses. Is there a way to avoid OutputCommitCoordinator to use
so much memory?

Ultimately, I choose to use partitions because most of the queries I have
will execute based the partition field. For example, "SELECT events from
customer where customer_id = 1234". If the partition is based on
customer_id, all events for a customer can be easily retrieved without
filtering the entire dataset which is much more efficient (I hope).
However, I notice that the implementation of the partition logic does not
seem to allow this type of use cases without using a lot of memory which is
a bit odd in my opinion. Any help will be greatly appreciated.

Best Regards,

Jerry



On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:

> Hi Jerry,
>
> Do you have speculation enabled? A write which produces one million files
> / output partitions might be using tons of driver memory via the
> OutputCommitCoordinator's bookkeeping data structures.
>
> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark guys,
>>
>> I think I hit the same issue SPARK-8890
>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>> resolved. However it is not. I have over a million output directories for 1
>> single column in partitionBy. Not sure if this is a regression issue? Do I
>> need to set some parameters to make it more memory efficient?
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>>
>>
>> On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi guys,
>>>
>>> After waiting for a day, it actually causes OOM on the spark driver. I
>>> configure the driver to have 6GB. Note that I didn't call refresh myself.
>>> The method was called when saving the dataframe in parquet format. Also I'm
>>> using partitionBy() on the DataFrameWriter to generate over 1 million
>>> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
>>> the output folder.
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>>
>>> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Spark users and developers,
>>>>
>>>> Does anyone encounter any issue when a spark SQL job produces a lot of
>>>> files (over 1 millions), the job hangs on the refresh method? I'm using
>>>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>>>> but the driver is doing something very intensively (it uses all the cpus).
>>>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>>>> single job?
>>>>
>>>> Thread 528: (state = BLOCKED)
>>>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled
>>>> frame)
>>>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43,
>>>> line=130 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>>>> line=114 (Compiled frame)
>>>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>>>> line=415 (Compiled frame)
>>>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>>>> (Compiled frame)
>>>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>>>> frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>>>> @bci=4, line=447 (Compiled frame)
>>>>  -
>>>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>>>> @bci=5, line=447 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=9, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>>>> @bci=2, line=244 (Compiled frame)
>>>>  -
>>>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>>>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>>>  - scala.collection.mutable.Arr

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi guys,

I mentioned that the partitions are generated so I tried to read the
partition data from it. The driver is OOM after few minutes. The stack
trace is below. It looks very similar to the the jstack above (note on the
refresh method). Thanks!

Name: java.lang.OutOfMemoryError
Message: GC overhead limit exceeded
StackTrace: java.util.Arrays.copyOf(Arrays.java:2367)
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:415)
java.lang.StringBuilder.append(StringBuilder.java:132)
org.apache.hadoop.fs.Path.toString(Path.java:384)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(interfaces.scala:447)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(interfaces.scala:447)
org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh(interfaces.scala:453)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute(interfaces.scala:465)org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache(interfaces.scala:463)
org.apache.spark.sql.sources.HadoopFsRelation.cachedLeafStatuses(interfaces.scala:470)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$MetadataCache.refresh(ParquetRelation.scala:381)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache$lzycompute(ParquetRelation.scala:145)org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.org$apache$spark$sql$execution$datasources$parquet$ParquetRelation$$metadataCache(ParquetRelation.scala:143)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation$$anonfun$6.apply(ParquetRelation.scala:196)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.dataSchema(ParquetRelation.scala:196)
org.apache.spark.sql.sources.HadoopFsRelation.schema$lzycompute(interfaces.scala:561)
org.apache.spark.sql.sources.HadoopFsRelation.schema(interfaces.scala:560)
org.apache.spark.sql.execution.datasources.LogicalRelation.(LogicalRelation.scala:31)
org.apache.spark.sql.SQLContext.baseRelationToDataFrame(SQLContext.scala:395)
org.apache.spark.sql.DataFrameReader.parquet(DataFrameReader.scala:267)


On Sun, Oct 25, 2015 at 10:25 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Josh,
>
> No I don't have speculation enabled. The driver took about few hours until
> it was OOM. Interestingly, all partitions are generated successfully
> (_SUCCESS file is written in the output directory). Is there a reason why
> the driver needs so much memory? The jstack revealed that it called refresh
> some file statuses. Is there a way to avoid OutputCommitCoordinator to
> use so much memory?
>
> Ultimately, I choose to use partitions because most of the queries I have
> will execute based the partition field. For example, "SELECT events from
> customer where customer_id = 1234". If the partition is based on
> customer_id, all events for a customer can be easily retrieved without
> filtering the entire dataset which is much more efficient (I hope).
> However, I notice that the implementation of the partition logic does not
> seem to allow this type of use cases without using a lot of memory which is
> a bit odd in my opinion. Any help will be greatly appreciated.
>
> Best Regards,
>
> Jerry
>
>
>
> On Sun, Oct 25, 2015 at 9:25 PM, Josh Rosen <rosenvi...@gmail.com> wrote:
>
>> Hi Jerry,
>>
>> Do you have speculation enabled? A write which produces one million files
>> / output partitions might be using tons of driver memory via the
>> OutputCommitCoordinator's bookkeeping data structures.
>>
>> On Sun, Oct 25, 2015 at 5:50 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi spark guys,
>>>
>>> I think I hit the same issue SPARK-8890
>>> https://issues.apache.org/jira/browse/SPARK-8890. It is marked as
>>

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi spark guys,

I think I hit the same issue SPARK-8890
https://issues.apache.org/jira/browse/SPARK-8890. It is marked as resolved.
However it is not. I have over a million output directories for 1 single
column in partitionBy. Not sure if this is a regression issue? Do I need to
set some parameters to make it more memory efficient?

Best Regards,

Jerry




On Sun, Oct 25, 2015 at 8:39 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi guys,
>
> After waiting for a day, it actually causes OOM on the spark driver. I
> configure the driver to have 6GB. Note that I didn't call refresh myself.
> The method was called when saving the dataframe in parquet format. Also I'm
> using partitionBy() on the DataFrameWriter to generate over 1 million
> files. Not sure why it OOM the driver after the job is marked _SUCCESS in
> the output folder.
>
> Best Regards,
>
> Jerry
>
>
> On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi Spark users and developers,
>>
>> Does anyone encounter any issue when a spark SQL job produces a lot of
>> files (over 1 millions), the job hangs on the refresh method? I'm using
>> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
>> but the driver is doing something very intensively (it uses all the cpus).
>> Does it mean Spark SQL cannot be used to produce over 1 million files in a
>> single job?
>>
>> Thread 528: (state = BLOCKED)
>>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
>> (Compiled frame)
>>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
>> line=114 (Compiled frame)
>>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
>> line=415 (Compiled frame)
>>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
>> (Compiled frame)
>>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled
>> frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
>> @bci=4, line=447 (Compiled frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
>> @bci=5, line=447 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>> @bci=9, line=244 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
>> @bci=2, line=244 (Compiled frame)
>>  -
>> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
>> scala.Function1) @bci=22, line=33 (Compiled frame)
>>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
>> @bci=2, line=108 (Compiled frame)
>>  -
>> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
>> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
>> (Compiled frame)
>>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
>> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
>> @bci=279, line=447 (Interpreted frame)
>>  -
>> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
>> @bci=8, line=453 (Interpreted frame)
>>  - 
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
>> @bci=26, line=465 (Interpreted frame)
>>  - 
>> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
>> @bci=12, line=463 (Interpreted frame)
>>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
>> line=540 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
>> @bci=1, line=204 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
>> @bci=392, line=152 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>> @bci=1, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
>> @bci=1, line=108 (Interpreted frame)
>>  -
>> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQL

Re: [Spark SQL]: Spark Job Hangs on the refresh method when saving over 1 million files

2015-10-25 Thread Jerry Lam
Hi guys,

After waiting for a day, it actually causes OOM on the spark driver. I
configure the driver to have 6GB. Note that I didn't call refresh myself.
The method was called when saving the dataframe in parquet format. Also I'm
using partitionBy() on the DataFrameWriter to generate over 1 million
files. Not sure why it OOM the driver after the job is marked _SUCCESS in
the output folder.

Best Regards,

Jerry


On Sat, Oct 24, 2015 at 9:35 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Spark users and developers,
>
> Does anyone encounter any issue when a spark SQL job produces a lot of
> files (over 1 millions), the job hangs on the refresh method? I'm using
> spark 1.5.1. Below is the stack trace. I saw the parquet files are produced
> but the driver is doing something very intensively (it uses all the cpus).
> Does it mean Spark SQL cannot be used to produce over 1 million files in a
> single job?
>
> Thread 528: (state = BLOCKED)
>  - java.util.Arrays.copyOf(char[], int) @bci=1, line=2367 (Compiled frame)
>  - java.lang.AbstractStringBuilder.expandCapacity(int) @bci=43, line=130
> (Compiled frame)
>  - java.lang.AbstractStringBuilder.ensureCapacityInternal(int) @bci=12,
> line=114 (Compiled frame)
>  - java.lang.AbstractStringBuilder.append(java.lang.String) @bci=19,
> line=415 (Compiled frame)
>  - java.lang.StringBuilder.append(java.lang.String) @bci=2, line=132
> (Compiled frame)
>  - org.apache.hadoop.fs.Path.toString() @bci=128, line=384 (Compiled frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(org.apache.hadoop.fs.FileStatus)
> @bci=4, line=447 (Compiled frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache$$anonfun$listLeafFiles$1.apply(java.lang.Object)
> @bci=5, line=447 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
> @bci=9, line=244 (Compiled frame)
>  - scala.collection.TraversableLike$$anonfun$map$1.apply(java.lang.Object)
> @bci=2, line=244 (Compiled frame)
>  -
> scala.collection.IndexedSeqOptimized$class.foreach(scala.collection.IndexedSeqOptimized,
> scala.Function1) @bci=22, line=33 (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.foreach(scala.Function1)
> @bci=2, line=108 (Compiled frame)
>  -
> scala.collection.TraversableLike$class.map(scala.collection.TraversableLike,
> scala.Function1, scala.collection.generic.CanBuildFrom) @bci=17, line=244
> (Compiled frame)
>  - scala.collection.mutable.ArrayOps$ofRef.map(scala.Function1,
> scala.collection.generic.CanBuildFrom) @bci=3, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.listLeafFiles(java.lang.String[])
> @bci=279, line=447 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation$FileStatusCache.refresh()
> @bci=8, line=453 (Interpreted frame)
>  - 
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache$lzycompute()
> @bci=26, line=465 (Interpreted frame)
>  - 
> org.apache.spark.sql.sources.HadoopFsRelation.org$apache$spark$sql$sources$HadoopFsRelation$$fileStatusCache()
> @bci=12, line=463 (Interpreted frame)
>  - org.apache.spark.sql.sources.HadoopFsRelation.refresh() @bci=1,
> line=540 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.parquet.ParquetRelation.refresh()
> @bci=1, line=204 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply$mcV$sp()
> @bci=392, line=152 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
> @bci=1, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1.apply()
> @bci=1, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(org.apache.spark.sql.SQLContext,
> org.apache.spark.sql.SQLContext$QueryExecution, scala.Function0) @bci=96,
> line=56 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation.run(org.apache.spark.sql.SQLContext)
> @bci=718, line=108 (Interpreted frame)
>  -
> org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute()
> @bci=20, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult()
> @bci=15, line=57 (Interpreted frame)
>  - org.apache.spark.sql.execution.ExecutedCommand.doExecute() @bci=12,
> line=69 (Interpreted frame)
>  - org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply()
> @bci=11, line=140 (Interpreted frame)
>  - org.apache.spark.sql.execution.Spa

Spark SQL: Issues with using DirectParquetOutputCommitter with APPEND mode and OVERWRITE mode

2015-10-22 Thread Jerry Lam
Hi Spark users and developers,

I read the ticket [SPARK-8578] (Should ignore user defined output committer
when appending data) which ignore DirectParquetOutputCommitter if append
mode is selected. The logic was that it is unsafe to use because it is not
possible to revert a failed job in append mode using
DirectParquetOutputCommitter. I think wouldn't it better to allow users to
use it at their own risk? Say, if you use DirectParquetOutputCommitter with
append mode, the job fails immediately when a task fails. The user can then
choose to reprocess the job entirely which is not a big deal since failure
is rare in most cases. Another approach is to provide at least once-task
semantic for append mode using DirectParquetOutputCommitter. This will end
up having duplicate records but for some applications, this is fine.

The second issue is that  the assumption that Overwrite mode works with
DirectParquetOutputCommitter for all cases is wrong at least from the
perspective of using it with s3. S3 provides eventual consistency for
overwrite PUTS and DELETES. So if you attempt to delete a directory and
then create the same directory again in a split of a second. The chance you
hit org.apache.hadoop.fs.FileAlreadyExistsException is very high because
deletes don't immediately and creating the same file before it is deleted
will result with the above exception. Might I propose to change the code
such that it will actually OVERWRITE the file instead of a delete following
by a create?

Best Regards,

Jerry


Spark SQL: Preserving Dataframe Schema

2015-10-20 Thread Jerry Lam
Hi Spark users and developers,

I have a dataframe with the following schema (Spark 1.5.1):

StructType(StructField(type,StringType,true),
StructField(timestamp,LongType,false))

After I save the dataframe in parquet and read it back, I get the following
schema:

StructType(StructField(timestamp,LongType,true),
StructField(type,StringType,true))

As you can see the schema does not match. The nullable field is set to true
for timestamp upon reading the dataframe back. Is there a way to preserve
the schema so that what we write to will be what we read back?

Best Regards,

Jerry


Re: Spark executor on Mesos - how to set effective user id?

2015-10-19 Thread Jerry Lam
Can you try setting SPARK_USER at the driver? It is used to impersonate users 
at the executor. So if you have user setup for launching spark jobs on the 
executor machines, simply set it to that user name for SPARK_USER. There is 
another configuration that will prevents jobs being launched with a different 
user except the one that is configured. I don't remember the name of it but it 
is in the documentation.


Sent from my iPhone

> On 19 Oct, 2015, at 8:14 am, Eugene Chepurniy  wrote:
> 
> Hi everyone!
> While we are trying to utilize Spark On Mesos cluster, we are facing an
> issue related to effective linux user id being used to start executors on
> Mesos slaves: all executors are trying to use driver's linux user id to
> start on Mesos slaves. 
> Let me explain in detail: spark driver program (which is going to spawn
> Spark on Mesos in coarse mode) is started as unprivileged linux user, for
> example 'user1'. We have Spark distribution unpacked and ready-to-use on
> every mesos slave (placed at /opt/spark, 'spark.mesos.executor.home' is
> pointing to this folder). And after attempt to run every executor fails to
> start with error log telling user 'user1' is not available. And it is really
> true - there is no 'user1' present on Mesos slaves. 
> So my question is: how can I control effective user id which will be used
> for start executors on Mesos?
> Actually I was trying to setup SPARK_USER=nobody on every slave but it
> wasn't useful. 
> Thanks for advice if any.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-executor-on-Mesos-how-to-set-effective-user-id-tp25118.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Indexing Support

2015-10-18 Thread Jerry Lam
I'm interested in it but I doubt there is r-tree indexing support in the near 
future as spark is not a database. You might have a better luck looking at 
databases with spatial indexing support out of the box. 

Cheers

Sent from my iPad

On 2015-10-18, at 17:16, Mustafa Elbehery  wrote:

> Hi All, 
> 
> I am trying to use spark to process Spatial Data. I am looking for R-Tree 
> Indexing support in best case, but I would be fine with any other indexing 
> capability as well, just to improve performance. 
> 
> Anyone had the same issue before, and is there any information regarding 
> Index support in future releases ?!!
> 
> Regards.
> 
> -- 
> Mustafa Elbehery
> EIT ICT Labs Master School
> +49(0)15750363097
> skype: mustafaelbehery87
> 


Re: Dataframes - sole data structure for parallel computations?

2015-10-08 Thread Jerry Lam
I just read the article by ogirardot but I don’t agree
It is like saying pandas dataframe is the sole data structure for analyzing 
data in python. Can Pandas dataframe replace Numpy array? The answer is simply 
no from an efficiency perspective for some computations. 

Unless there is a computer science breakthrough in terms of data structure 
(i.e. the data structure of everything), the statement of sole data structure 
can be treated as a joke only. Just in case, people get upset. I AM JOKING :) 

> On Oct 8, 2015, at 1:56 PM, Michael Armbrust  wrote:
> 
> Don't worry, the ability to work with domain objects and lambda functions is 
> not going to go away.  However, we are looking at ways to leverage Tungsten's 
> improved performance when processing structured data.
> 
> More details can be found here:
> https://issues.apache.org/jira/browse/SPARK- 
> 
> 
> On Thu, Oct 8, 2015 at 7:40 AM, Tracewski, Lukasz 
>  > wrote:
> Hi,
> 
>  
> 
> Many people interpret this slide from Databricks
> 
> https://ogirardot.files.wordpress.com/2015/05/future-of-spark.png 
> 
> as indication that Dataframes API is going to be the main processing unit of 
> Spark and sole access point to MLlib, Streaming and such. Is it true? My 
> impression was that Dataframes are an additional abstraction layer with some 
> promising optimisation coming from Tungsten project, but that’s all. RDDs are 
> there to stay. They are a natural selection when it comes to e.g. processing 
> images.
> 
>  
> 
> Here is one article that advertises Dataframes as a “sole data structure for 
> parallel computations”:
> 
> https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/
>  
> 
>  (paragraph 4)
> 
>  
> 
> Cheers,
> 
> Lucas
> 
>  
> 
>  
> 
> 
> 
> ==
> Please access the attached hyperlink for an important electronic 
> communications disclaimer:
> http://www.credit-suisse.com/legal/en/disclaimer_email_ib.html 
> 
> ==
> 
> 



Re: spark-submit --packages using different resolver

2015-10-06 Thread Jerry Lam
This is the ticket SPARK-10951
<https://issues.apache.org/jira/browse/SPARK-10951>

Cheers~

On Tue, Oct 6, 2015 at 11:33 AM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Burak,
>
> Thank you for the tip.
> Unfortunately it does not work. It throws:
>
> java.net.MalformedURLException: unknown protocol: s3n]
> at
> org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1003)
> at
> org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> It looks like the meat is in the createRepoResolvers which does not
> currently support s3 repo. I will file a jira ticket for this.
>
> Best Regards,
>
> Jerry
>
> On Sat, Oct 3, 2015 at 12:50 PM, Burak Yavuz <brk...@gmail.com> wrote:
>
>> Hi Jerry,
>>
>> The --packages feature doesn't support private repositories right now.
>> However, in the case of s3, maybe it might work. Could you please try using
>> the --repositories flag and provide the address:
>> `$ spark-submit --packages my:awesome:package --repositories
>> s3n://$aws_ak:$aws_sak@bucket/path/to/repo`
>>
>> If that doesn't work, could you please file a JIRA?
>>
>> Best,
>> Burak
>>
>>
>> On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi spark users and developers,
>>>
>>> I'm trying to use spark-submit --packages against private s3 repository.
>>> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
>>> wonder how can I add this resolver into spark-submit such that --packages
>>> can resolve dependencies from private repo?
>>>
>>> Thank you!
>>>
>>> Jerry
>>>
>>
>>
>


Re: spark-submit --packages using different resolver

2015-10-06 Thread Jerry Lam
Hi Burak,

Thank you for the tip.
Unfortunately it does not work. It throws:

java.net.MalformedURLException: unknown protocol: s3n]
at
org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1003)
at
org.apache.spark.deploy.SparkSubmit$.prepareSubmitEnvironment(SparkSubmit.scala:286)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:153)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

It looks like the meat is in the createRepoResolvers which does not
currently support s3 repo. I will file a jira ticket for this.

Best Regards,

Jerry

On Sat, Oct 3, 2015 at 12:50 PM, Burak Yavuz <brk...@gmail.com> wrote:

> Hi Jerry,
>
> The --packages feature doesn't support private repositories right now.
> However, in the case of s3, maybe it might work. Could you please try using
> the --repositories flag and provide the address:
> `$ spark-submit --packages my:awesome:package --repositories
> s3n://$aws_ak:$aws_sak@bucket/path/to/repo`
>
> If that doesn't work, could you please file a JIRA?
>
> Best,
> Burak
>
>
> On Thu, Oct 1, 2015 at 8:58 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi spark users and developers,
>>
>> I'm trying to use spark-submit --packages against private s3 repository.
>> With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
>> wonder how can I add this resolver into spark-submit such that --packages
>> can resolve dependencies from private repo?
>>
>> Thank you!
>>
>> Jerry
>>
>
>


Re: Limiting number of cores per job in multi-threaded driver.

2015-10-04 Thread Jerry Lam
Philip, the guy is trying to help you. Calling him silly is a bit too far. He 
might assume your problem is IO bound which might not be the case. If you need 
only 4 cores per job no matter what there is little advantage to use spark in 
my opinion because you can easily do this with just a worker farm that take the 
job and process it in a single machine. let the scheduler figures out which 
node in the farm is idled and spawns jobs on those until all of them are 
saturated. Call me silly but this seems much simpler.

Sent from my iPhone

> On 3 Oct, 2015, at 12:02 am, Philip Weaver  wrote:
> 
> You can't really say 8 cores is not much horsepower when you have no idea 
> what my use case is. That's silly.
> 
>> On Fri, Sep 18, 2015 at 10:33 PM, Adrian Tanase  wrote:
>> Forgot to mention that you could also restrict the parallelism to 4, 
>> essentially using only 4 cores at any given time, however if your job is 
>> complex, a stage might be broken into more than 1 task...
>> 
>> Sent from my iPhone
>> 
>> On 19 Sep 2015, at 08:30, Adrian Tanase  wrote:
>> 
>>> Reading through the docs it seems that with a combination of FAIR scheduler 
>>> and maybe pools you can get pretty far.
>>> 
>>> However the smallest unit of scheduled work is the task so probably you 
>>> need to think about the parallelism of each transformation.
>>> 
>>> I'm guessing that by increasing the level of parallelism you get many 
>>> smaller tasks that the scheduler can then run across the many jobs you 
>>> might have - as opposed to fewer, longer tasks...
>>> 
>>> Lastly, 8 cores is not that much horsepower :) 
>>> You may consider running with beefier machines or a larger cluster, to get 
>>> at least tens of cores.
>>> 
>>> Hope this helps,
>>> -adrian
>>> 
>>> Sent from my iPhone
>>> 
>>> On 18 Sep 2015, at 18:37, Philip Weaver  wrote:
>>> 
 Here's a specific example of what I want to do. My Spark application is 
 running with total-executor-cores=8. A request comes in, it spawns a 
 thread to handle that request, and starts a job. That job should use only 
 4 cores, not all 8 of the cores available to the cluster.. When the first 
 job is scheduled, it should take only 4 cores, not all 8 of the cores that 
 are available to the driver.
 
 Is there any way to accomplish this? This is on mesos.
 
 In order to support the use cases described in 
 https://spark.apache.org/docs/latest/job-scheduling.html, where a spark 
 application runs for a long time and handles requests from multiple users, 
 I believe what I'm asking about is a very important feature. One of the 
 goals is to get lower latency for each request, but if the first request 
 takes all resources and we can't guarantee any free resources for the 
 second request, then that defeats the purpose. Does that make sense?
 
 Thanks in advance for any advice you can provide!
 
 - Philip
 
> On Sat, Sep 12, 2015 at 10:40 PM, Philip Weaver  
> wrote:
> I'm playing around with dynamic allocation in spark-1.5.0, with the FAIR 
> scheduler, so I can define a long-running application capable of 
> executing multiple simultaneous spark jobs.
> 
> The kind of jobs that I'm running do not benefit from more than 4 cores, 
> but I want my application to be able to take several times that in order 
> to run multiple jobs at the same time.
> 
> I suppose my question is more basic: How can I limit the number of cores 
> used to load an RDD or DataFrame? I can immediately repartition or 
> coalesce my RDD or DataFrame to 4 partitions after I load it, but that 
> doesn't stop Spark from using more cores to load it.
> 
> Does it make sense what I am trying to accomplish, and is there any way 
> to do it?
> 
> - Philip
> 


spark-submit --packages using different resolver

2015-10-01 Thread Jerry Lam
Hi spark users and developers,

I'm trying to use spark-submit --packages against private s3 repository.
With sbt, I'm using fm-sbt-s3-resolver with proper aws s3 credentials. I
wonder how can I add this resolver into spark-submit such that --packages
can resolve dependencies from private repo?

Thank you!

Jerry


Re: Spark SQL: Implementing Custom Data Source

2015-09-29 Thread Jerry Lam
Hi Michael and Ted,

Thank you for the reference. Is it true that once I implement a custom data
source, it can be used in all spark supported language? That is Scala,
Java, Python and R. :)
I want to take advantage of the interoperability that is already built in
spark.

Thanks!

Jerry

On Tue, Sep 29, 2015 at 11:31 AM, Michael Armbrust <mich...@databricks.com>
wrote:

> Thats a pretty advanced example that uses experimental APIs.  I'd suggest
> looking at https://github.com/databricks/spark-avro as a reference.
>
> On Mon, Sep 28, 2015 at 9:00 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> See this thread:
>>
>> http://search-hadoop.com/m/q3RTttmiYDqGc202
>>
>> And:
>>
>>
>> http://spark.apache.org/docs/latest/sql-programming-guide.html#data-sources
>>
>> On Sep 28, 2015, at 8:22 PM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>> Hi spark users and developers,
>>
>> I'm trying to learn how implement a custom data source for Spark SQL. Is
>> there a documentation that I can use as a reference? I'm not sure exactly
>> what needs to be extended/implemented. A general workflow will be greatly
>> helpful!
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>


Spark SQL: Implementing Custom Data Source

2015-09-28 Thread Jerry Lam
Hi spark users and developers,

I'm trying to learn how implement a custom data source for Spark SQL. Is
there a documentation that I can use as a reference? I'm not sure exactly
what needs to be extended/implemented. A general workflow will be greatly
helpful!

Best Regards,

Jerry


Re: Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-26 Thread Jerry Lam
Hi Michael,

Thanks for the tip. With dataframe, is it possible to explode some selected
fields in each purchase_items?
Since purchase_items is an array of item and each item has a number of
fields (for example product_id and price), is it possible to just explode
these two fields directly using dataframe?

Best Regards,


Jerry

On Fri, Sep 25, 2015 at 7:53 PM, Michael Armbrust <mich...@databricks.com>
wrote:

> The SQL parser without HiveContext is really simple, which is why I
> generally recommend users use HiveContext.  However, you can do it with
> dataframes:
>
> import org.apache.spark.sql.functions._
> table("purchases").select(explode(df("purchase_items")).as("item"))
>
>
>
> On Fri, Sep 25, 2015 at 4:21 PM, Jerry Lam <chiling...@gmail.com> wrote:
>
>> Hi sparkers,
>>
>> Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
>> I don't want to start up a metastore and derby just because I need
>> LATERAL VIEW EXPLODE.
>>
>> I have been trying but I always get the exception like this:
>>
>> Name: java.lang.RuntimeException
>> Message: [1.68] failure: ``union'' expected but identifier view found
>>
>> with the query look like:
>>
>> "select items from purhcases lateral view explode(purchase_items) tbl as
>> items"
>>
>> Best Regards,
>>
>> Jerry
>>
>>
>


Spark SQL: Native Support for LATERAL VIEW EXPLODE

2015-09-25 Thread Jerry Lam
Hi sparkers,

Anyone knows how to do LATERAL VIEW EXPLODE without HiveContext?
I don't want to start up a metastore and derby just because I need LATERAL
VIEW EXPLODE.

I have been trying but I always get the exception like this:

Name: java.lang.RuntimeException
Message: [1.68] failure: ``union'' expected but identifier view found

with the query look like:

"select items from purhcases lateral view explode(purchase_items) tbl as
items"

Best Regards,

Jerry


Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
in love with it especially when I was using the Ceph Object Gateway S3 API.
There are some incompatibilities with aws s3 api. You really really need to
try it because making the commitment. Did you managed to install it?

On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com 
wrote:

> Hi guys,
>
> Here is the info for Ceph : http://ceph.com/
>
> We are investigating and using Ceph for distributed storage and
> monitoring, specifically interested
>
> in using Ceph as the underlied file system storage for spark. However, we
> had no experience for achiveing
>
> that. Any body has seen such progress ?
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>


Re: Re: Spark standalone/Mesos on top of Ceph

2015-09-22 Thread Jerry Lam
Hi Sun,

The issue with Ceph as the underlying file system for Spark is that you
lose data locality. Ceph is not designed to have spark run directly on top
of the OSDs. I know that cephfs provides data location information via
hadoop compatible API. The last time I researched on this is that the
integration is experimental (just google it and you will find a lot of
discussions eg.
http://lists.ceph.com/pipermail/ceph-users-ceph.com/2015-July/002837.html).

However, this might not be a biggest issue as long as you have GREAT
network bandwidth like infiniband or +10 Gigabit Ethernet. My guess is that
the architecture and the performance will be similar to S3+Spark at best
(with 10GE instances) if you guys do the network stuff seriously.

HTH,

Jerry

On Tue, Sep 22, 2015 at 9:59 PM, fightf...@163.com <fightf...@163.com>
wrote:

> Hi Jerry
>
> Yeah, we managed to run and use ceph already in our few production
> environment, especially with OpenStack.
>
> The reason we want to use Ceph is that we aim to look for some workarounds
> for unified storage layer and the design
>
> concepts of ceph is quite catching. I am just interested in such work like
> the hadoop cephfs plugin and recently we
>
> are going to do some benchmark tests between HDFS and cephfs.
>
> So the ongoing progress would be benificial if some related work between
> Apache Spark and Ceph could dedicate some
>
> thoughful insights.
>
> BTW, for the Ceph Object Gateway s3 rest api, agreed for such
> inconvinience and some incompobilities. However, we had not
>
> yet quite researched and tested over radosgw a lot. But we had some little
> requirements using gw in some use cases.
>
> Hope for more considerations and talks.
>
> Best,
> Sun.
>
> --
> fightf...@163.com
>
>
> *From:* Jerry Lam <chiling...@gmail.com>
> *Date:* 2015-09-23 09:37
> *To:* fightf...@163.com
> *CC:* user <user@spark.apache.org>
> *Subject:* Re: Spark standalone/Mesos on top of Ceph
> Do you have specific reasons to use Ceph? I used Ceph before, I'm not too
> in love with it especially when I was using the Ceph Object Gateway S3 API.
> There are some incompatibilities with aws s3 api. You really really need to
> try it because making the commitment. Did you managed to install it?
>
> On Tue, Sep 22, 2015 at 9:28 PM, fightf...@163.com <fightf...@163.com>
> wrote:
>
>> Hi guys,
>>
>> Here is the info for Ceph : http://ceph.com/
>>
>> We are investigating and using Ceph for distributed storage and
>> monitoring, specifically interested
>>
>> in using Ceph as the underlied file system storage for spark. However, we
>> had no experience for achiveing
>>
>> that. Any body has seen such progress ?
>>
>> Best,
>> Sun.
>>
>> --
>> fightf...@163.com
>>
>
>
>
>
>


Re: How does one use s3 for checkpointing?

2015-09-21 Thread Jerry Lam
Hi Amit,

Have you looked at Amazon EMR? Most people using EMR use s3 for persistency 
(both as input and output of spark jobs). 

Best Regards,

Jerry

Sent from my iPhone

> On 21 Sep, 2015, at 9:24 pm, Amit Ramesh  wrote:
> 
> 
> A lot of places in the documentation mention using s3 for checkpointing, 
> however I haven't found any examples or concrete evidence of anyone having 
> done this.
> Is this a safe/reliable option given the read-after-write consistency for 
> PUTS in s3?
> Is s3 access broken for hadoop 2.6 (SPARK-7442)? If so, is it viable in 2.4?
> Related to #2. I did try providing hadoop-aws-2.6.0.jar while submitting the 
> job and got the following stack trace. Is there a fix?
> py4j.protocol.Py4JJavaError: An error occurred while calling 
> None.org.apache.spark.api.java.JavaSparkContext.
> : java.util.ServiceConfigurationError: org.apache.hadoop.fs.FileSystem: 
> Provider org.apache.hadoop.fs.s3a.S3AFileSystem could not be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:224)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:181)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:377)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:445)
> at 
> org.apache.hadoop.fs.FileSystem.loadFileSystems(FileSystem.java:2563)
> at 
> org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2574)
> at 
> org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2591)
> at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
> at 
> org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
> at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
> at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
> at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1354)
> at org.apache.spark.SparkContext.addFile(SparkContext.scala:1332)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at 
> org.apache.spark.SparkContext$$anonfun$15.apply(SparkContext.scala:475)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.SparkContext.(SparkContext.scala:475)
> at 
> org.apache.spark.api.java.JavaSparkContext.(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native 
> Method)
> at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at 
> py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at 
> py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.NoClassDefFoundError: 
> com/amazonaws/AmazonServiceException
> at java.lang.Class.getDeclaredConstructors0(Native Method)
> at java.lang.Class.privateGetDeclaredConstructors(Class.java:2585)
> at java.lang.Class.getConstructor0(Class.java:2885)
> at java.lang.Class.newInstance(Class.java:350)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:373)
> ... 27 more
> Caused by: java.lang.ClassNotFoundException: 
> com.amazonaws.AmazonServiceException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
> at java.security.AccessController.doPrivileged(Native Method)
> at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
> ... 32 more
> 
> Thanks!
> Amit
> 


Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Spark Developers,

I just ran some very simple operations on a dataset. I was surprise by the
execution plan of take(1), head() or first().

For your reference, this is what I did in pyspark 1.5:
df=sqlContext.read.parquet("someparquetfiles")
df.head()

The above lines take over 15 minutes. I was frustrated because I can do
better without using spark :) Since I like spark, so I tried to figure out
why. It seems the dataframe requires 3 stages to give me the first row. It
reads all data (which is about 1 billion rows) and run Limit twice.

Instead of head(), show(1) runs much faster. Not to mention that if I do:

df.rdd.take(1) //runs much faster.

Is this expected? Why head/first/take is so slow for dataframe? Is it a bug
in the optimizer? or I did something wrong?

Best Regards,

Jerry


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
Hi Yin,

You are right! I just tried the scala version with the above lines, it
works as expected.
I'm not sure if it happens also in 1.4 for pyspark but I thought the
pyspark code just calls the scala code via py4j. I didn't expect that this
bug is pyspark specific. That surprises me actually a bit. I created a
ticket for this (SPARK-10731
<https://issues.apache.org/jira/browse/SPARK-10731>).

Best Regards,

Jerry


On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai <yh...@databricks.com> wrote:

> btw, does 1.4 has the same problem?
>
> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai <yh...@databricks.com> wrote:
>
>> Hi Jerry,
>>
>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>
>>> Hi Spark Developers,
>>>
>>> I just ran some very simple operations on a dataset. I was surprise by
>>> the execution plan of take(1), head() or first().
>>>
>>> For your reference, this is what I did in pyspark 1.5:
>>> df=sqlContext.read.parquet("someparquetfiles")
>>> df.head()
>>>
>>> The above lines take over 15 minutes. I was frustrated because I can do
>>> better without using spark :) Since I like spark, so I tried to figure out
>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>
>>> Instead of head(), show(1) runs much faster. Not to mention that if I do:
>>>
>>> df.rdd.take(1) //runs much faster.
>>>
>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>> bug in the optimizer? or I did something wrong?
>>>
>>> Best Regards,
>>>
>>> Jerry
>>>
>>
>>
>


Re: Spark SQL DataFrame 1.5.0 is extremely slow for take(1) or head() or first()

2015-09-21 Thread Jerry Lam
I just noticed you found 1.4 has the same issue. I added that as well in
the ticket.

On Mon, Sep 21, 2015 at 1:43 PM, Jerry Lam <chiling...@gmail.com> wrote:

> Hi Yin,
>
> You are right! I just tried the scala version with the above lines, it
> works as expected.
> I'm not sure if it happens also in 1.4 for pyspark but I thought the
> pyspark code just calls the scala code via py4j. I didn't expect that this
> bug is pyspark specific. That surprises me actually a bit. I created a
> ticket for this (SPARK-10731
> <https://issues.apache.org/jira/browse/SPARK-10731>).
>
> Best Regards,
>
> Jerry
>
>
> On Mon, Sep 21, 2015 at 1:01 PM, Yin Huai <yh...@databricks.com> wrote:
>
>> btw, does 1.4 has the same problem?
>>
>> On Mon, Sep 21, 2015 at 10:01 AM, Yin Huai <yh...@databricks.com> wrote:
>>
>>> Hi Jerry,
>>>
>>> Looks like it is a Python-specific issue. Can you create a JIRA?
>>>
>>> Thanks,
>>>
>>> Yin
>>>
>>> On Mon, Sep 21, 2015 at 8:56 AM, Jerry Lam <chiling...@gmail.com> wrote:
>>>
>>>> Hi Spark Developers,
>>>>
>>>> I just ran some very simple operations on a dataset. I was surprise by
>>>> the execution plan of take(1), head() or first().
>>>>
>>>> For your reference, this is what I did in pyspark 1.5:
>>>> df=sqlContext.read.parquet("someparquetfiles")
>>>> df.head()
>>>>
>>>> The above lines take over 15 minutes. I was frustrated because I can do
>>>> better without using spark :) Since I like spark, so I tried to figure out
>>>> why. It seems the dataframe requires 3 stages to give me the first row. It
>>>> reads all data (which is about 1 billion rows) and run Limit twice.
>>>>
>>>> Instead of head(), show(1) runs much faster. Not to mention that if I
>>>> do:
>>>>
>>>> df.rdd.take(1) //runs much faster.
>>>>
>>>> Is this expected? Why head/first/take is so slow for dataframe? Is it a
>>>> bug in the optimizer? or I did something wrong?
>>>>
>>>> Best Regards,
>>>>
>>>> Jerry
>>>>
>>>
>>>
>>
>


Re: Java vs. Scala for Spark

2015-09-08 Thread Jerry Lam
Hi Bryan,

I would choose a language based on the requirements. It does not make sense
if you have a lot of dependencies that are java-based components and
interoperability between java and scala is not always obvious.

I agree with the above comments that Java is much more verbose than Scala
in many cases if not all. However, I personally don't find the verbosity is
a key factor in choosing a language. For the sake of argument, will you be
discouraged if you need to write 3 lines of Java for 1 line of scala? I
really don't care the number of lines as long as I can finish the task
within a period of time.

I believe, correct me if I'm wrong please, all spark functionalities you
can find in Scala are also available in Java that includes the mllib,
sparksql, streaming, etc. So you won't miss any features of spark by using
Java.

It seems the questions should be
- what language do the developers are comfortable with?
- what are the components in the system that will constraint the choice of
the language?

Best Regards,

Jerry

On Tue, Sep 8, 2015 at 11:59 AM, Dean Wampler  wrote:

> It's true that Java 8 lambdas help. If you've read Learning Spark, where
> they use Java 7, Python, and Scala for the examples, it really shows how
> awful Java without lambdas is for Spark development.
>
> Still, there are several "power tools" in Scala I would sorely miss using
> Java 8:
>
> 1. The REPL (interpreter): I do most of my work in the REPL, then move the
> code to compiled code when I'm ready to turn it into a batch job. Even
> better, use Spark Notebook ! (and on GitHub
> ).
> 2. Tuples: It's just too convenient to use tuples for schemas, return
> values from functions, etc., etc., etc.,
> 3. Pattern matching: This has no analog in Java, so it's hard to
> appreciate it until you understand it, but see this example
> 
> for a taste of how concise it makes code!
> 4. Type inference: Spark really shows its utility. It means a lot less
> code to write, but you get the hints of what you just wrote!
>
> My $0.02.
>
> dean
>
>
> Dean Wampler, Ph.D.
> Author: Programming Scala, 2nd Edition
>  (O'Reilly)
> Typesafe 
> @deanwampler 
> http://polyglotprogramming.com
>
> On Tue, Sep 8, 2015 at 10:28 AM, Igor Berman 
> wrote:
>
>> we are using java7..its much more verbose that java8 or scala examples
>> in addition there sometimes libraries that has no java  api, so you need
>> to write them by yourself(e.g. graphx)
>> on the other hand, scala is not trivial language like java, so it depends
>> on your team
>>
>> On 8 September 2015 at 17:44, Bryan Jeffrey 
>> wrote:
>>
>>> Thank you for the quick responses.  It's useful to have some insight
>>> from folks already extensively using Spark.
>>>
>>> Regards,
>>>
>>> Bryan Jeffrey
>>>
>>> On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:
>>>
 Why would Scala vs Java performance be different Ted? Relatively
 speaking there is almost no runtime difference; it's the same APIs or
 calls via a thin wrapper. Scala/Java vs Python is a different story.

 Java libraries can be used in Scala. Vice-versa too, though calling
 Scala-generated classes can be clunky in Java. What's your concern
 about interoperability Jeffrey?

 I disagree that Java 7 vs Scala usability is sooo different, but it's
 certainly much more natural to use Spark in Scala. Java 8 closes a lot
 of the usability gap with Scala, but not all of it. Enough that it's
 not crazy for a Java shop to stick to Java 8 + Spark and not be at a
 big disadvantage.

 The downsides of Scala IMHO are that it provides too much: lots of
 nice features (closures! superb collections!), lots of rope to hang
 yourself too (implicits sometimes!) and some WTF features (XML
 literals!) Learning the good useful bits of Scala isn't hard. You can
 always write Scala code as much like Java as you like, I find.

 Scala tooling is different from Java tooling; that's an
 underappreciated barrier. For example I think SBT is good for
 development, bad for general project lifecycle management compared to
 Maven, but in any event still less developed. SBT/scalac are huge
 resource hogs, since so much of Scala is really implemented in the
 compiler; prepare to update your laptop to develop in Scala on your
 IDE of choice, and start to think about running long-running compile
 servers like we did in the year 2000.

 Still net-net I would choose Scala, FWIW.

 On Tue, Sep 8, 2015 at 3:07 PM, Ted Yu  wrote:
 > Performance wise, Scala is by 

Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Jerry Lam
Hi Guru,

Thanks! Great to hear that someone tried it in production. How do you like
it so far?

Best Regards,

Jerry


On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote:

 Hi Jerry,

 Yes. I’ve seen customers using this in production for data science work.
 I’m currently using this for one of my projects on a cluster as well.

 Also, here is a blog that describes how to configure this.


 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users and developers,

 Did anyone have IPython Notebook (Jupyter) deployed in production that
 uses Spark as the computational engine?

 I know Databricks Cloud provides similar features with deeper integration
 with Spark. However, Databricks Cloud has to be hosted by Databricks so we
 cannot do this.

 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython
 has already offered years ago. It would be great if someone can educate me
 the reason behind this.

 Best Regards,

 Jerry





Re: Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Jerry Lam
Hi Prabeesh,

That's even better!

Thanks for sharing

Jerry


On Tue, Aug 18, 2015 at 1:31 PM, Prabeesh K. prabsma...@gmail.com wrote:

 Refer this post
 http://blog.prabeeshk.com/blog/2015/06/19/pyspark-notebook-with-docker/

 Spark + Jupyter + Docker

 On 18 August 2015 at 21:29, Jerry Lam chiling...@gmail.com wrote:

 Hi Guru,

 Thanks! Great to hear that someone tried it in production. How do you
 like it so far?

 Best Regards,

 Jerry


 On Tue, Aug 18, 2015 at 11:38 AM, Guru Medasani gdm...@gmail.com wrote:

 Hi Jerry,

 Yes. I’ve seen customers using this in production for data science work.
 I’m currently using this for one of my projects on a cluster as well.

 Also, here is a blog that describes how to configure this.


 http://blog.cloudera.com/blog/2014/08/how-to-use-ipython-notebook-with-apache-spark/


 Guru Medasani
 gdm...@gmail.com



 On Aug 18, 2015, at 8:35 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi spark users and developers,

 Did anyone have IPython Notebook (Jupyter) deployed in production that
 uses Spark as the computational engine?

 I know Databricks Cloud provides similar features with deeper
 integration with Spark. However, Databricks Cloud has to be hosted by
 Databricks so we cannot do this.

 Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython
 has already offered years ago. It would be great if someone can educate me
 the reason behind this.

 Best Regards,

 Jerry







Spark + Jupyter (IPython Notebook)

2015-08-18 Thread Jerry Lam
Hi spark users and developers,

Did anyone have IPython Notebook (Jupyter) deployed in production that uses
Spark as the computational engine?

I know Databricks Cloud provides similar features with deeper integration
with Spark. However, Databricks Cloud has to be hosted by Databricks so we
cannot do this.

Other solutions (e.g. Zeppelin) seem to reinvent the wheel that IPython has
already offered years ago. It would be great if someone can educate me the
reason behind this.

Best Regards,

Jerry


Re: [survey] [spark-ec2] What do you like/dislike about spark-ec2?

2015-08-17 Thread Jerry Lam
Hi Nick,

I forgot to mention in the survey that ganglia is never installed properly
for some reasons.

I have this exception every time I launched the cluster:

Starting httpd: httpd: Syntax error on line 154 of
/etc/httpd/conf/httpd.conf: Cannot load
/etc/httpd/modules/mod_authz_core.so into server:
/etc/httpd/modules/mod_authz_core.so: cannot open shared object file: No
such file or directory

[FAILED]

Best Regards,

Jerry

On Mon, Aug 17, 2015 at 11:09 AM, Nicholas Chammas 
nicholas.cham...@gmail.com wrote:

 Howdy folks!

 I’m interested in hearing about what people think of spark-ec2
 http://spark.apache.org/docs/latest/ec2-scripts.html outside of the
 formal JIRA process. Your answers will all be anonymous and public.

 If the embedded form below doesn’t work for you, you can use this link to
 get the same survey:

 http://goo.gl/forms/erct2s6KRR

 Cheers!
 Nick
 ​



Re: Controlling number of executors on Mesos vs YARN

2015-08-12 Thread Jerry Lam
Great stuff Tim. This definitely will make Mesos users life easier

Sent from my iPad

On 2015-08-12, at 11:52, Haripriya Ayyalasomayajula aharipriy...@gmail.com 
wrote:

 Thanks Tim, Jerry.
 
 On Wed, Aug 12, 2015 at 1:18 AM, Tim Chen t...@mesosphere.io wrote:
 Yes the options are not that configurable yet but I think it's not hard to 
 change it.
 
 I have a patch out actually specifically able to configure amount of cpus per 
 executor in coarse grain mode, and hopefully merged next release.
 
 I think the open question now is for fine grain mode can we limit the number 
 of maximum concurrent executors, and I think we can definitely just add a new 
 option like spark.mesos.executor.max to cap it. 
 
 I'll file a jira and hopefully to get this change in soon too.
 
 Tim
 
 
 
 On Tue, Aug 11, 2015 at 6:21 AM, Haripriya Ayyalasomayajula 
 aharipriy...@gmail.com wrote:
 Spark evolved as an example framework for Mesos - thats how I know it. It is 
 surprising to see that the options provided by mesos in this case are less. 
 Tweaking the source code, haven't done it yet but I would love to see what 
 options could be there! 
 
 On Tue, Aug 11, 2015 at 5:42 AM, Jerry Lam chiling...@gmail.com wrote:
 My experience with Mesos + Spark is not great. I saw one executor with 30 CPU 
 and the other executor with 6. So I don't think you can easily configure it 
 without some tweaking at the source code.
 
 Sent from my iPad
 
 On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula aharipriy...@gmail.com 
 wrote:
 
 Hi Tim,
 
 Spark on Yarn allows us to do it using --num-executors and --executor_cores 
 commandline arguments. I just got a chance to look at a similar spark user 
 list mail, but no answer yet. So does mesos allow setting the number of 
 executors and cores? Is there a default number it assumes?
 
 On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen t...@mesosphere.io wrote:
 Forgot to hit reply-all.
 
 -- Forwarded message --
 From: Tim Chen t...@mesosphere.io
 Date: Sun, Jan 4, 2015 at 10:46 PM
 Subject: Re: Controlling number of executors on Mesos vs YARN
 To: mvle m...@us.ibm.com
 
 
 Hi Mike,
 
 You're correct there is no such setting in for Mesos coarse grain mode, 
 since the assumption is that each node is launched with one container and 
 Spark is launching multiple tasks in that container.
 
 In fine-grain mode there isn't a setting like that, as it currently will 
 launch an executor as long as it satisfies the minimum container resource 
 requirement.
 
 I've created a JIRA earlier about capping the number of executors or better 
 distribute the # of executors launched in each node. Since the decision of 
 choosing what node to launch containers is all in the Spark scheduler side, 
 it's very easy to modify it.
 
 Btw, what's the configuration to set the # of executors on YARN side?
 
 Thanks,
 
 Tim
 
 
 
 On Sun, Jan 4, 2015 at 9:37 PM, mvle m...@us.ibm.com wrote:
 I'm trying to compare the performance of Spark running on Mesos vs YARN.
 However, I am having problems being able to configure the Spark workload to
 run in a similar way on Mesos and YARN.
 
 When running Spark on YARN, you can specify the number of executors per
 node. So if I have a node with 4 CPUs, I can specify 6 executors on that
 node. When running Spark on Mesos, there doesn't seem to be an equivalent
 way to specify this. In Mesos, you can somewhat force this by specifying the
 number of CPU resources to be 6 when running the slave daemon. However, this
 seems to be a static configuration of the Mesos cluster rather something
 that can be configured in the Spark framework.
 
 So here is my question:
 
 For Spark on Mesos, am I correct that there is no way to control the number
 of executors per node (assuming an idle cluster)? For Spark on Mesos
 coarse-grained mode, there is a way to specify max_cores but that is still
 not equivalent to specifying the number of executors per node as when Spark
 is run on YARN.
 
 If I am correct, then it seems Spark might be at a disadvantage running on
 Mesos compared to YARN (since it lacks the fine tuning ability provided by
 YARN).
 
 Thanks,
 Mike
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 
 
 -- 
 Regards,
 Haripriya Ayyalasomayajula 
 
 
 
 
 -- 
 Regards,
 Haripriya Ayyalasomayajula 
 
 
 
 
 
 -- 
 Regards,
 Haripriya Ayyalasomayajula 
 


Re: Controlling number of executors on Mesos vs YARN

2015-08-11 Thread Jerry Lam
My experience with Mesos + Spark is not great. I saw one executor with 30 CPU 
and the other executor with 6. So I don't think you can easily configure it 
without some tweaking at the source code.

Sent from my iPad

On 2015-08-11, at 2:38, Haripriya Ayyalasomayajula aharipriy...@gmail.com 
wrote:

 Hi Tim,
 
 Spark on Yarn allows us to do it using --num-executors and --executor_cores 
 commandline arguments. I just got a chance to look at a similar spark user 
 list mail, but no answer yet. So does mesos allow setting the number of 
 executors and cores? Is there a default number it assumes?
 
 On Mon, Jan 5, 2015 at 5:07 PM, Tim Chen t...@mesosphere.io wrote:
 Forgot to hit reply-all.
 
 -- Forwarded message --
 From: Tim Chen t...@mesosphere.io
 Date: Sun, Jan 4, 2015 at 10:46 PM
 Subject: Re: Controlling number of executors on Mesos vs YARN
 To: mvle m...@us.ibm.com
 
 
 Hi Mike,
 
 You're correct there is no such setting in for Mesos coarse grain mode, since 
 the assumption is that each node is launched with one container and Spark is 
 launching multiple tasks in that container.
 
 In fine-grain mode there isn't a setting like that, as it currently will 
 launch an executor as long as it satisfies the minimum container resource 
 requirement.
 
 I've created a JIRA earlier about capping the number of executors or better 
 distribute the # of executors launched in each node. Since the decision of 
 choosing what node to launch containers is all in the Spark scheduler side, 
 it's very easy to modify it.
 
 Btw, what's the configuration to set the # of executors on YARN side?
 
 Thanks,
 
 Tim
 
 
 
 On Sun, Jan 4, 2015 at 9:37 PM, mvle m...@us.ibm.com wrote:
 I'm trying to compare the performance of Spark running on Mesos vs YARN.
 However, I am having problems being able to configure the Spark workload to
 run in a similar way on Mesos and YARN.
 
 When running Spark on YARN, you can specify the number of executors per
 node. So if I have a node with 4 CPUs, I can specify 6 executors on that
 node. When running Spark on Mesos, there doesn't seem to be an equivalent
 way to specify this. In Mesos, you can somewhat force this by specifying the
 number of CPU resources to be 6 when running the slave daemon. However, this
 seems to be a static configuration of the Mesos cluster rather something
 that can be configured in the Spark framework.
 
 So here is my question:
 
 For Spark on Mesos, am I correct that there is no way to control the number
 of executors per node (assuming an idle cluster)? For Spark on Mesos
 coarse-grained mode, there is a way to specify max_cores but that is still
 not equivalent to specifying the number of executors per node as when Spark
 is run on YARN.
 
 If I am correct, then it seems Spark might be at a disadvantage running on
 Mesos compared to YARN (since it lacks the fine tuning ability provided by
 YARN).
 
 Thanks,
 Mike
 
 
 
 --
 View this message in context: 
 http://apache-spark-user-list.1001560.n3.nabble.com/Controlling-number-of-executors-on-Mesos-vs-YARN-tp20966.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org
 
 
 
 
 
 
 -- 
 Regards,
 Haripriya Ayyalasomayajula 
 


Re: Parquet without hadoop: Possible?

2015-08-11 Thread Jerry Lam
Just out of curiosity, what is the advantage of using parquet without hadoop?


Sent from my iPhone

 On 11 Aug, 2015, at 11:12 am, saif.a.ell...@wellsfargo.com wrote:
 
 I confirm that it works,
  
 I was just having this issue: https://issues.apache.org/jira/browse/SPARK-8450
  
 Saif
  
 From: Ellafi, Saif A. 
 Sent: Tuesday, August 11, 2015 12:01 PM
 To: Ellafi, Saif A.; deanwamp...@gmail.com
 Cc: user@spark.apache.org
 Subject: RE: Parquet without hadoop: Possible?
  
 Sorry, I provided bad information. This example worked fine with reduced 
 parallelism.
  
 It seems my problem have to do with something specific with the real data 
 frame at reading point.
  
 Saif
  
  
 From: saif.a.ell...@wellsfargo.com [mailto:saif.a.ell...@wellsfargo.com] 
 Sent: Tuesday, August 11, 2015 11:49 AM
 To: deanwamp...@gmail.com
 Cc: user@spark.apache.org
 Subject: RE: Parquet without hadoop: Possible?
  
 I am launching my spark-shell
 spark-1.4.1-bin-hadoop2.6/bin/spark-shell
  
 15/08/11 09:43:32 INFO SparkILoop: Created sql context (with Hive support)..
 SQL context available as sqlContext.
  
 scala val data = sc.parallelize(Array(2,3,5,7,2,3,6,1)).toDF
 scala data.write.parquet(/var/ data/Saif/pq)
  
 Then I get a million errors:
 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
 15/08/11 09:46:01 INFO CodecPool: Got brand-new compressor [.gz]
 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:09 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:08 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 15/08/11 09:46:07 ERROR InsertIntoHadoopFsRelation: Aborting task.
 java.lang.OutOfMemoryError: Java heap space
 at 
 parquet.bytes.CapacityByteArrayOutputStream.initSlabs(CapacityByteArrayOutputStream.java:65)
 at 
 parquet.bytes.CapacityByteArrayOutputStream.init(CapacityByteArrayOutputStream.java:57)
 at 
 parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:68)
 at 
 parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.init(ColumnChunkPageWriteStore.java:48)
 at 
 parquet.hadoop.ColumnChunkPageWriteStore.getPageWriter(ColumnChunkPageWriteStore.java:215)
 at 
 parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:67)
 at 
 parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
 at 
 parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.init(MessageColumnIO.java:178)
 at 
 parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
 at 
 parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
 at 
 parquet.hadoop.InternalParquetRecordWriter.init(InternalParquetRecordWriter.java:94)
 at 
 parquet.hadoop.ParquetRecordWriter.init(ParquetRecordWriter.java:64)
 at 
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
 at 
 parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
 at 
 org.apache.spark.sql.parquet.ParquetOutputWriter.init(newParquet.scala:83)
 at 
 org.apache.spark.sql.parquet.ParquetRelation2$$anon$4.newInstance(newParquet.scala:229)
 at 
 org.apache.spark.sql.sources.DefaultWriterContainer.initWriters(commands.scala:470)
 at 
 org.apache.spark.sql.sources.BaseWriterContainer.executorSideSetup(commands.scala:360)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation.org$apache$spark$sql$sources$InsertIntoHadoopFsRelation$$writeRows$1(commands.scala:172)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
 at 
 org.apache.spark.sql.sources.InsertIntoHadoopFsRelation$$anonfun$insert$1.apply(commands.scala:160)
 at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
 at org.apache.spark.scheduler.Task.run(Task.scala:70)
 at 
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
 at 
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at 
 

Re: Accessing S3 files with s3n://

2015-08-09 Thread Jerry Lam
Hi Akshat,

Is there a particular reason you don't use s3a? From my experience,s3a performs 
much better than the rest. I believe the inefficiency is from the 
implementation of the s3 interface.

Best Regards,

Jerry

Sent from my iPhone

 On 9 Aug, 2015, at 5:48 am, Akhil Das ak...@sigmoidanalytics.com wrote:
 
 Depends on which operation you are doing, If you are doing a .count() on a 
 parquet, it might not download the entire file i think, but if you do a 
 .count() on a normal text file it might pull the entire file.
 
 Thanks
 Best Regards
 
 On Sat, Aug 8, 2015 at 3:12 AM, Akshat Aranya aara...@gmail.com wrote:
 Hi,
 
 I've been trying to track down some problems with Spark reads being very 
 slow with s3n:// URIs (NativeS3FileSystem).  After some digging around, I 
 realized that this file system implementation fetches the entire file, which 
 isn't really a Spark problem, but it really slows down things when trying to 
 just read headers from a Parquet file or just creating partitions in the 
 RDD.  Is this something that others have observed before, or am I doing 
 something wrong?
 
 Thanks,
 Akshat
 


Poor HDFS Data Locality on Spark-EC2

2015-08-04 Thread Jerry Lam
Hi Spark users and developers,

I have been trying to use spark-ec2. After I launched the spark cluster
(1.4.1) with ephemeral hdfs (using hadoop 2.4.0), I tried to execute a job
where the data is stored in the ephemeral hdfs. It does not matter what I
tried to do, there is no data locality at all. For instance, filtering data
and calculating the count of the filter data will always have locality
level any. I tweaked the configurations spark.locality.wait.* but it does
not seem to care. I'm guessing this is because the hostname cannot be
resolved properly. Does anyone experience this problem before?

Best Regards,

Jerry


Spark Master Build Git Commit Hash

2015-07-30 Thread Jerry Lam
Hi Spark users and developers,

I wonder which git commit was used to build the latest master-nightly build
found at:
http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/?
I downloaded the build but I couldn't find the information related to it.
Thank you!

Best Regards,

Jerry


Re: Spark Master Build Git Commit Hash

2015-07-30 Thread Jerry Lam
Hi Ted,

The problem is that I don't know if the build uses the commits happened on
the same day or it is possible that it builds based on Jul 15th commits.
Just a thought, it might be possible to replace SNAPSHOT with the git
commit hash in the filename so people will know which one is based on.

Thank you for your help!

Jerry

On Thu, Jul 30, 2015 at 11:10 AM, Ted Yu yuzhih...@gmail.com wrote:

 The files were dated 16-Jul-2015
 Looks like nightly build either was not published, or published at a
 different location.

 You can download spark-1.5.0-SNAPSHOT.tgz and binary-search for the
 commits made on Jul 16th.
 There may be other ways of determining the latest commit.

 Cheers

 On Thu, Jul 30, 2015 at 7:39 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Spark users and developers,

 I wonder which git commit was used to build the latest master-nightly
 build found at:
 http://people.apache.org/~pwendell/spark-nightly/spark-master-bin/latest/
 ?
 I downloaded the build but I couldn't find the information related to it.
 Thank you!

 Best Regards,

 Jerry





Unexpected performance issues with Spark SQL using Parquet

2015-07-27 Thread Jerry Lam
Hi spark users and developers,

I have been trying to understand how Spark SQL works with Parquet for the
couple of days. There is a performance problem that is unexpected using the
column pruning. Here is a dummy example:

The parquet file has the 3 fields:

 |-- customer_id: string (nullable = true)
 |-- type: string (nullable = true)
 |-- mapping: map (nullable = true)
 ||-- key: string
 ||-- value: string (nullable = true)

Note that mapping is just a field with a lot of key value pairs.
I just created a parquet files with 1 billion entries with each entry
having 10 key-value pairs in the mapping.

After I generate this parquet file, I generate another parquet without the
mapping field that is:
 |-- customer_id: string (nullable = true)
 |-- type: string (nullable = true)

Let call the first parquet file data-with-mapping and the second parquet
file data-without-mapping.

Then I ran a very simple query over two parquet files:
val df = sqlContext.read.parquet(path)
df.select(df(type)).count

The run on the data-with-mapping takes 34 seconds with the input size
of 11.7 MB.
The run on the data-without-mapping takes 8 seconds with the input size of
7.6 MB.

They all ran on the same cluster with spark 1.4.1.
What bothers me the most is the input size because I supposed column
pruning will only deserialize columns that are relevant to the query (in
this case the field type) but for sure, it reads more data on the
data-with-mapping than the data-without-mapping. The speed is 4x faster in
the data-without-mapping that means that the more columns a parquet file
has the slower it is even only a specific column is needed.

Anyone has an explanation on this? I was expecting both of them will finish
approximate the same time.

Best Regards,

Jerry


Re: Parquet problems

2015-07-22 Thread Jerry Lam
Hi guys,

I noticed that too. Anders, can you confirm that it works on Spark 1.5
snapshot? This is what I tried at the end. It seems it is 1.4 issue.

Best Regards,

Jerry

On Wed, Jul 22, 2015 at 11:46 AM, Anders Arpteg arp...@spotify.com wrote:

 No, never really resolved the problem, except by increasing the permgem
 space which only partially solved it. Still have to restart the job
 multiple times so make the whole job complete (it stores intermediate
 results).

 The parquet data sources have about 70 columns, and yes Cheng, it works
 fine when only loading a small sample of the data.

 Thankful for any hints,
 Anders

 On Wed, Jul 22, 2015 at 5:29 PM Cheng Lian lian.cs@gmail.com wrote:

  How many columns are there in these Parquet files? Could you load a
 small portion of the original large dataset successfully?

 Cheng


 On 6/25/15 5:52 PM, Anders Arpteg wrote:

 Yes, both the driver and the executors. Works a little bit better with
 more space, but still a leak that will cause failure after a number of
 reads. There are about 700 different data sources that needs to be loaded,
 lots of data...

  tor 25 jun 2015 08:02 Sabarish Sasidharan 
 sabarish.sasidha...@manthan.comsabarish.sasidha...@manthan.com skrev:

 Did you try increasing the perm gen for the driver?

 Regards
 Sab

 On 24-Jun-2015 4:40 pm, Anders Arpteg arp...@spotify.com wrote:

 When reading large (and many) datasets with the Spark 1.4.0 DataFrames
 parquet reader (the org.apache.spark.sql.parquet format), the following
 exceptions are thrown:

  Exception in thread sk-result-getter-0

 Exception: java.lang.OutOfMemoryError thrown from the
 UncaughtExceptionHandler in thread task-result-getter-0
 Exception in thread task-result-getter-3 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-1 java.lang.OutOfMemoryError:
 PermGen space
 Exception in thread task-result-getter-2 java.lang.OutOfMemoryError:
 PermGen space


  and many more like these from different threads. I've tried
 increasing the PermGen space using the -XX:MaxPermSize VM setting, but even
 after tripling the space, the same errors occur. I've also tried storing
 intermediate results, and am able to get the full job completed by running
 it multiple times and starting for the last successful intermediate result.
 There seems to be some memory leak in the parquet format. Any hints on how
 to fix this problem?

  Thanks,
 Anders




Re: Counting distinct values for a key?

2015-07-19 Thread Jerry Lam
You mean this does not work?

SELECT key, count(value) from table group by key



On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote:

 Hello,

 How do I go about performing the equivalent of the following SQL clause in
 Spark Streaming? I will be using this on a Windowed DStream.

 SELECT key, count(distinct(value)) from table group by key;

 so for example, given the following dataset in the table:

  key | value
 -+---
  k1  | v1
  k1  | v1
  k1  | v2
  k1  | v3
  k1  | v3
  k2  | vv1
  k2  | vv1
  k2  | vv2
  k2  | vv2
  k2  | vv2
  k3  | vvv1
  k3  | vvv1

 the result will be:

  key | count
 -+---
  k1  | 3
  k2  | 2
  k3  | 1

 Thanks
 Nikunj




Re: Spark Mesos Dispatcher

2015-07-19 Thread Jerry Lam
Yes. 

Sent from my iPhone

 On 19 Jul, 2015, at 10:52 pm, Jahagirdar, Madhu 
 madhu.jahagir...@philips.com wrote:
 
 All,
 
 Can we run different version of Spark using the same Mesos Dispatcher. For 
 example we can run drivers with Spark 1.3 and Spark 1.4 at the same time ?
 
 Regards,
 Madhu Jahagirdar
 
 The information contained in this message may be confidential and legally 
 protected under applicable law. The message is intended solely for the 
 addressee(s). If you are not the intended recipient, you are hereby notified 
 that any use, forwarding, dissemination, or reproduction of this message is 
 strictly prohibited and may be unlawful. If you are not the intended 
 recipient, please contact the sender by return e-mail and destroy all copies 
 of the original message.


Re: Counting distinct values for a key?

2015-07-19 Thread Jerry Lam
Hi Nikunj,

Sorry, I totally misread your question.
I think you need to first groupbykey (get all values of the same key together), 
then follow by mapValues (probably put the values into a set and then take the 
size of it because you want a distinct count)

HTH,

Jerry

Sent from my iPhone

 On 19 Jul, 2015, at 8:48 pm, N B nb.nos...@gmail.com wrote:
 
 Hi Suyog,
 
 That code outputs the following:
 
 key2 val22 : 1
 key1 val1 : 2
 key2 val2 : 2
 
 while the output I want to achieve would have been (with your example):
 
 key1 : 2
 key2 : 2
 
 because there are 2 distinct types of values for each key ( regardless of 
 their actual duplicate counts .. hence the use of the DISTINCT keyword in the 
 query equivalent ).
 
 Thanks
 Nikunj
 
 
 On Sun, Jul 19, 2015 at 2:37 PM, suyog choudhari suyogchoudh...@gmail.com 
 wrote:
 public static void main(String[] args) {
 
 SparkConf sparkConf = new SparkConf().setAppName(CountDistinct);
 
 JavaSparkContext jsc = new JavaSparkContext(sparkConf);
 
 
 ListTuple2String, String list = new ArrayListTuple2String, String();
 
 
 list.add(new Tuple2String, String(key1, val1));
 
 list.add(new Tuple2String, String(key1, val1));
 
 list.add(new Tuple2String, String(key2, val2));
 
 list.add(new Tuple2String, String(key2, val2));
 
 list.add(new Tuple2String, String(key2, val22));
 
 
 
 JavaPairRDDString, Integer rdd =  jsc.parallelize(list).mapToPair(t - new 
 Tuple2String, Integer(t._1 +   +t._2, 1));
 
 
 JavaPairRDDString, Integer rdd2 = rdd.reduceByKey((c1, c2) - c1+c2 );
 
 
 
 ListTuple2String, Integer output =  rdd2.collect();
 
 
 for (Tuple2?,? tuple : output) {
 
 System.out.println( tuple._1() +  :  + tuple._2() );
 
 }
 
 
 }
 
 
 On Sun, Jul 19, 2015 at 2:28 PM, Jerry Lam chiling...@gmail.com wrote:
 You mean this does not work?
 
 SELECT key, count(value) from table group by key
 
 
 
 On Sun, Jul 19, 2015 at 2:28 PM, N B nb.nos...@gmail.com wrote:
 Hello,
 
 How do I go about performing the equivalent of the following SQL clause in 
 Spark Streaming? I will be using this on a Windowed DStream.
 
 SELECT key, count(distinct(value)) from table group by key;
 
 so for example, given the following dataset in the table:
 
  key | value
 -+---
  k1  | v1
  k1  | v1
  k1  | v2
  k1  | v3
  k1  | v3
  k2  | vv1
  k2  | vv1
  k2  | vv2
  k2  | vv2
  k2  | vv2
  k3  | vvv1
  k3  | vvv1
 
 the result will be:
 
  key | count
 -+---
  k1  | 3
  k2  | 2
  k3  | 1
 
 Thanks
 Nikunj
 


Re: Spark Mesos Dispatcher

2015-07-19 Thread Jerry Lam
I only used client mode both 1.3 and 1.4 versions on mesos.
I skimmed through
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala.
I would actually backport the Cluster Mode feature. Sorry, I don't have an
answer for this.


On Sun, Jul 19, 2015 at 11:16 PM, Jahagirdar, Madhu 
madhu.jahagir...@philips.com wrote:

  1.3 does not have MesosDisptacher or does not have support for Mesos
 cluster mode , is it still possible to create a Dispatcher using 1.4 and
 run 1.3 using that dispatcher ?
  --
 *From:* Jerry Lam [chiling...@gmail.com]
 *Sent:* Monday, July 20, 2015 8:27 AM
 *To:* Jahagirdar, Madhu
 *Cc:* user; d...@spark.apache.org
 *Subject:* Re: Spark Mesos Dispatcher

   Yes.

 Sent from my iPhone

 On 19 Jul, 2015, at 10:52 pm, Jahagirdar, Madhu 
 madhu.jahagir...@philips.com wrote:

   All,

  Can we run different version of Spark using the same Mesos Dispatcher.
 For example we can run drivers with Spark 1.3 and Spark 1.4 at the same
 time ?

  Regards,
 Madhu Jahagirdar

 --
 The information contained in this message may be confidential and legally
 protected under applicable law. The message is intended solely for the
 addressee(s). If you are not the intended recipient, you are hereby
 notified that any use, forwarding, dissemination, or reproduction of this
 message is strictly prohibited and may be unlawful. If you are not the
 intended recipient, please contact the sender by return e-mail and destroy
 all copies of the original message.




Re: Benchmark results between Flink and Spark

2015-07-14 Thread Jerry Lam
FYI, another benchmark:
http://eastcirclek.blogspot.kr/2015/06/terasort-for-spark-and-flink-with-range.html

quote: I have observed a lot of fetch failures while running Spark, which
results in many restarted tasks and, therefore, takes the longest time. I
suspect that executors are incapable of serving shuffle data due to JVMs
doing long garbage-collection (I also tried large numbers for
spark.core.connection.ack.wait.timeout). Flink seems to be irrelevant to GC
issues thanks to its own internal memory management. MapReduce and Tez
execute each task in a separate process and rely on an external auxiliary
service for shuffling. Although the shuffle service could exhibit fetch
failures for other reasons, it works without any fetch failure in this
experiment for Hadoop MapReduce and Tez.

On Mon, Jul 6, 2015 at 3:13 AM, Jan-Paul Bultmann janpaulbultm...@me.com
wrote:

 Sorry, that should be shortest path, and diameter of the graph.
 I shouldn't write emails before I get my morning coffee...

 On 06 Jul 2015, at 09:09, Jan-Paul Bultmann janpaulbultm...@me.com
 wrote:

 I would guess the opposite is true for highly iterative benchmarks (common
 in graph processing and data-science).

 Spark has a pretty large overhead per iteration, more optimisations and
 planning only makes this worse.

 Sure people implemented things like dijkstra's algorithm in spark
 (a problem where the number of iterations is bounded by the circumference
 of the input graph),
 but all the datasets I've seen it running on had a very small
 circumference (which is common for e.g. social networks).

 Take sparkSQL for example. Catalyst is a really good query optimiser, but
 it introduces significant overhead.
 Since spark has no iterative semantics on its own (unlike flink),
 one has to materialise the intermediary dataframe at each iteration
 boundary to determine if a termination criterion is reached.
 This causes a huge amount of planning, especially since it looks like
 catalyst will try to optimise the dependency graph
 regardless of caching. A dependency graph that grows in the number of
 iterations and thus the size of the input dataset.

 In flink on the other hand, you can describe you entire iterative program
 through transformations without ever calling an action.
 This means that the optimiser will only have to do planing once.

 Just my 2 cents :)
 Cheers, Jan

 On 06 Jul 2015, at 06:10, n...@reactor8.com wrote:

 Maybe some flink benefits from some pts they outline here:

 http://flink.apache.org/news/2015/05/11/Juggling-with-Bits-and-Bytes.html

 Probably if re-ran the benchmarks with 1.5/tungsten line would close the
 gap a bit(or a lot) with spark moving towards similar style off-heap memory
 mgmt, more planning optimizations


 *From:* Jerry Lam [mailto:chiling...@gmail.com chiling...@gmail.com]
 *Sent:* Sunday, July 5, 2015 6:28 PM
 *To:* Ted Yu
 *Cc:* Slim Baltagi; user
 *Subject:* Re: Benchmark results between Flink and Spark

 Hi guys,

 I just read the paper too. There is no much information regarding why
 Flink is faster than Spark for data science type of workloads in the
 benchmark. It is very difficult to generalize the conclusion of a benchmark
 from my point of view. How much experience the author has with Spark is in
 comparisons to Flink is one of the immediate questions I have. It would be
 great if they have the benchmark software available somewhere for other
 people to experiment.

 just my 2 cents,

 Jerry

 On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com wrote:

 There was no mentioning of the versions of Flink and Spark used in
 benchmarking.

 The size of cluster is quite small.

 Cheers

 On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com wrote:

 Hi

 Apache Flink outperforms Apache Spark in processing machine learning 
 graph
 algorithms and relational queries but not in batch processing!

 The results were published in the proceedings of the 18th International
 Conference, Business Information Systems 2015, Poznań, Poland, June 24-26,
 2015.

 Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big
 Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan
 Franczyk is available for preview at http://goo.gl/WocQci on pages 28-37.

 Enjoy!

 Slim Baltagi
 http://www.SparkBigData.com http://www.sparkbigdata.com/




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com
 http://nabble.com/.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org






Re: Benchmark results between Flink and Spark

2015-07-05 Thread Jerry Lam
Hi guys,

I just read the paper too. There is no much information regarding why Flink
is faster than Spark for data science type of workloads in the benchmark.
It is very difficult to generalize the conclusion of a benchmark from my
point of view. How much experience the author has with Spark is in
comparisons to Flink is one of the immediate questions I have. It would be
great if they have the benchmark software available somewhere for other
people to experiment.

just my 2 cents,

Jerry

On Sun, Jul 5, 2015 at 4:35 PM, Ted Yu yuzhih...@gmail.com wrote:

 There was no mentioning of the versions of Flink and Spark used in
 benchmarking.

 The size of cluster is quite small.

 Cheers

 On Sun, Jul 5, 2015 at 10:24 AM, Slim Baltagi sbalt...@gmail.com wrote:

 Hi

 Apache Flink outperforms Apache Spark in processing machine learning 
 graph
 algorithms and relational queries but not in batch processing!

 The results were published in the proceedings of the 18th International
 Conference, Business Information Systems 2015, Poznań, Poland, June 24-26,
 2015.

 Thanks to our friend Google, Chapter 3: 'Evaluating New Approaches of Big
 Data Analytics Frameworks' by Norman Spangenberg, Martin Roth and Bogdan
 Franczyk is available for preview at http://goo.gl/WocQci on pages 28-37.

 Enjoy!

 Slim Baltagi
 http://www.SparkBigData.com




 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Benchmark-results-between-Flink-and-Spark-tp23626.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org





Re: Reading from CSV file with spark-csv_2.10

2015-02-05 Thread Jerry Lam
Hi Florin,

I might be wrong but timestamp looks like a keyword in SQL that the engine
gets confused with. If it is a column name of your table, you might want to
change it. (
https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Types)

I'm constantly working with CSV files with spark. However, I didn't use the
spark-csv package though. I did that manually so I cannot comment on the
spark-csv.

HTH,

Jerry


On Thu, Feb 5, 2015 at 9:32 AM, Spico Florin spicoflo...@gmail.com wrote:

 Hello!
 I'm using spark-csv 2.10 with Java from the maven repository
 groupIdcom.databricks/groupId
 artifactIdspark-csv_2.10/artifactId
 version0.1.1/version

 I would like to use Spark-SQL to filter out my data. I'm using the
 following code:
 JavaSchemaRDD cars = new JavaCsvParser().withUseHeader(true).csvFile(
 sqlContext, logFile);
 cars.registerAsTable(mytable);
  JavaSchemaRDD doll = sqlContext.sql(SELECT TimeStamp FROM mytable);
 doll.saveAsTextFile(dolly.csv);

 but I'm getting the following error:
 Exception in thread main java.lang.RuntimeException: [1.8] failure:
 ``UNION'' expected but `TimeStamp' fo

 SELECT TimeStamp FROM mytablel
 at scala.sys.package$.error(package.scala:27)
 at
 org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)

 Can you please tell me what is the best approach to filter the CSV data
 with SQL?
 Thank you.
  Regards,
  Florin



Re: Union in Spark

2015-02-01 Thread Jerry Lam
Hi Deep,

what do you mean by stuck?

Jerry

On Mon, Feb 2, 2015 at 12:44 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 Is there any better operation than Union. I am using union and the cluster
 is getting stuck with a large data set.

 Thank you



Re: Union in Spark

2015-02-01 Thread Jerry Lam
Hi Deep,

How do you know the cluster is not responsive because of Union?
Did you check the spark web console?

Best Regards,

Jerry


On Mon, Feb 2, 2015 at 1:21 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 The cluster hangs.

 On Mon, Feb 2, 2015 at 11:25 AM, Jerry Lam chiling...@gmail.com wrote:

 Hi Deep,

 what do you mean by stuck?

 Jerry


 On Mon, Feb 2, 2015 at 12:44 AM, Deep Pradhan pradhandeep1...@gmail.com
 wrote:

 Hi,
 Is there any better operation than Union. I am using union and the
 cluster is getting stuck with a large data set.

 Thank you






Re: Spark Team - Paco Nathan said that your team can help

2015-01-22 Thread Jerry Lam
Hi Sudipta,

I would also like to suggest to ask this question in Cloudera mailing list
since you have HDFS, MAPREDUCE and Yarn requirements. Spark can work with
HDFS and YARN but it is more like a client to those clusters. Cloudera can
provide services to answer your question more clearly. I'm not affiliate
with Cloudera but it seems they are the only one who is very active in the
spark project and provides a hadoop distribution.

HTH,

Jerry

btw, who is Paco Nathan?

On Thu, Jan 22, 2015 at 10:03 AM, Babu, Prashanth 
prashanth.b...@nttdata.com wrote:

  Sudipta,



 Use the Docker image [1] and play around with Hadoop and Spark in the VM
 for a while.

 Decide on your use case(s) and then you can move ahead for installing on a
 cluster, etc.

 This Docker image has all you want [HDFS + MapReduce + Spark + YARN].



 All the best!



 [1]: https://github.com/sequenceiq/docker-spark



 *From:* Sudipta Banerjee [mailto:asudipta.baner...@gmail.com]
 *Sent:* 22 January 2015 14:51
 *To:* Marco Shaw
 *Cc:* user@spark.apache.org
 *Subject:* Re: Spark Team - Paco Nathan said that your team can help



 Hi Marco,

 Thanks for the confirmation. Please let me know what are the lot more
 detail you need to answer a very specific question  WHAT IS THE MINIMUM
 HARDWARE CONFIGURATION REQUIRED TO BUILT HDFS+ MAPREDUCE+SPARK+YARN  on a
 system? Please let me know if you need any further information and if you
 dont know please drive across with the $1 to Sir Paco Nathan and get me
 the answer.

 Thanks and Regards,

 Sudipta



 On Thu, Jan 22, 2015 at 5:33 PM, Marco Shaw marco.s...@gmail.com wrote:

 Hi,

 Let me reword your request so you understand how (too) generic your
 question is

 Hi, I have $10,000, please find me some means of transportation so I can
 get to work.

 Please provide (a lot) more details. If you can't, consider using one of
 the pre-built express VMs from either Cloudera, Hortonworks or MapR, for
 example.

 Marco




  On Jan 22, 2015, at 7:36 AM, Sudipta Banerjee 
 asudipta.baner...@gmail.com wrote:
 
 
 
  Hi Apache-Spark team ,
 
  What are the system requirements installing Hadoop and Apache Spark?
  I have attached the screen shot of Gparted.
 
 
  Thanks and regards,
  Sudipta
 
 
 
 
  --
  Sudipta Banerjee
  Consultant, Business Analytics and Cloud Based Architecture
  Call me +919019578099

  Screenshot - Wednesday 21 January 2015 - 10:55:29 IST.png
 

  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org




 --

 Sudipta Banerjee

 Consultant, Business Analytics and Cloud Based Architecture

 Call me +919019578099

 __
 Disclaimer: This email and any attachments are sent in strictest confidence
 for the sole use of the addressee and may contain legally privileged,
 confidential, and proprietary data. If you are not the intended recipient,
 please advise the sender by replying promptly to this email and then delete
 and destroy this email and any attachments without any further use, copying
 or forwarding.



  1   2   >