exhaustive list of configuration options

2018-11-19 Thread Shiyuan
Hi Spark Users,
Is there a way I can get the exhaustive list of configuration options
and their default values? The documentation page
https://spark.apache.org/docs/latest/configuration.html is not exhaustive.
 The Spark UI/environment tab is not exhaustive either.   Thank you!


Exception thrown in awaitResult during application launch in yarn cluster

2018-05-18 Thread Shiyuan
Hi Spark-users,
   I am using pyspark on a yarn cluster. One of my spark application launch
failed. Only the driver container had started before it failed on the
ACCEPTED state. The error message is very short and I cannot make sense of
it. The error message is attached below. Any possible causes for this
error?  Thank you!

18/05/18 06:33:09 INFO ApplicationMaster: Starting the user application in
a separate Thread
18/05/18 06:33:09 INFO ApplicationMaster: Waiting for spark context
initialization...
18/05/18 06:33:14 ERROR ApplicationMaster: User application exited with
status 2
18/05/18 06:33:14 INFO ApplicationMaster: Final app status: FAILED,
exitCode: 2, (reason: User application exited with status 2)
18/05/18 06:33:14 ERROR ApplicationMaster: Uncaught exception:
org.apache.spark.SparkException: Exception thrown in awaitResult:
at
org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
at
org.apache.spark.deploy.yarn.ApplicationMaster.runDriver(ApplicationMaster.scala:428)
at
org.apache.spark.deploy.yarn.ApplicationMaster.run(ApplicationMaster.scala:281)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$main$1.apply$mcV$sp(ApplicationMaster.scala:783)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:67)
at
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:66)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
at
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:66)
at
org.apache.spark.deploy.yarn.ApplicationMaster$.main(ApplicationMaster.scala:781)
at
org.apache.spark.deploy.yarn.ApplicationMaster.main(ApplicationMaster.scala)
Caused by: org.apache.spark.SparkUserAppException: User application exited
with 2
at
org.apache.spark.deploy.PythonRunner$.main(PythonRunner.scala:104)
at org.apache.spark.deploy.PythonRunner.main(PythonRunner.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:654)
18/05/18 06:33:14 INFO ApplicationMaster: Unregistering ApplicationMaster
with FAILED (diag message: User application exited with status 2)
18/05/18 06:33:14 INFO ApplicationMaster: Deleting staging directory


Submit many spark applications

2018-05-16 Thread Shiyuan
Hi Spark-users,
 I want to submit as many spark applications as the resources permit. I am
using cluster mode on a yarn cluster.  Yarn can queue and launch these
applications without problems. The problem lies on spark-submit itself.
Spark-submit starts a jvm which could fail due to insufficient memory on
the machine where I run spark-submit if many spark-submit jvm are running.
Any suggestions on how to solve this problem? Thank you!


Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-11 Thread Shiyuan
Here it is :
https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/2991198123660769/823198936734135/866038034322120/latest.html


On Wed, Apr 11, 2018 at 10:55 AM, Alessandro Solimando <
alessandro.solima...@gmail.com> wrote:

> Hi Shiyuan,
> can you show us the output of ¨explain¨ over df (as a last step)?
>
> On 11 April 2018 at 19:47, Shiyuan <gshy2...@gmail.com> wrote:
>
>> Variable name binding is a python thing, and Spark should not care how
>> the variable is named. What matters is the dependency graph. Spark fails to
>> handle this dependency graph correctly for which I am quite surprised: this
>> is just a simple combination of three very common sql operations.
>>
>>
>> On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi Shiyuan,
>>>
>>> I do not know whether I am right, but I would prefer to avoid
>>> expressions in Spark as:
>>>
>>> df = <>
>>>
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>>
>>>> Here is the pretty print of the physical plan which reveals some
>>>> details about what causes the bug (see the lines highlighted in bold):
>>>> WithColumnRenamed() fails to update the dependency graph correctly:
>>>>
>>>>
>>>> 'Resolved attribute(s) kk#144L missing from
>>>> ID#118,LABEL#119,kk#96L,score#121 in operator !Project [ID#118,
>>>> score#121, LABEL#119, kk#144L]. Attribute(s) with the same name appear in
>>>> the operation: kk. Please check if the right attribute(s) are used
>>>>
>>>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>>>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>>>:- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>>:  +- Join Inner, (ID#64 = ID#99)
>>>>: :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>>>: :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>>>: : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>>>: +- Project [ID#99]
>>>>:+- Filter (nL#90L > cast(1 as bigint))
>>>>:   +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100)
>>>> AS nL#90L]
>>>>:  +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>>>: +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>>>> score#102]
>>>>:+- LogicalRDD [ID#99, LABEL#100, k#101L,
>>>> score#102]
>>>>+- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>>>   +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>>>> count#118L]
>>>>  +- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>> +- Join Inner, (ID#135 = ID#99)
>>>>:- Project [ID#135, score#138, LABEL#136, kk#128L]
>>>>:  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>>>> score#138]*
>>>>: +- LogicalRDD [ID#135, LABEL#136, k#137L,
>>>> score#138]
>>>>+- Project [ID#99]
>>>>   +- Filter (nL#90L > cast(1 as bigint))
>>>>  +- Aggregate [ID#99], [ID#99, count(distinct
>>>> LABEL#100) AS nL#90L]
>>>> +- *!Project [ID#99, score#102, LABEL#100,
>>>> kk#128L]*
>>>>+-* Project [ID#99, LABEL#100, k#101L AS
>>>> kk#73L, score#102]*
>>>>   +- LogicalRDD [ID#99, LABEL#100, k#101L,
>>>> score#102]
>>>>
>>>> Here is the code which generates the error:
>>>>
>>>> import pyspark.sql.functions as F
>>>> from pyspark.sql import Row
>>>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2
>>>> ),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRename
>>>> d("k","kk").select("ID","score","LABEL","kk")
>>>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).f
>>>> ilter(F.col("nL")>1)
>>>> df = df.join(df_t.select("ID"),["ID"])
>>>> df_sw = df.groupby(["ID","kk"]).count().wi

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-11 Thread Shiyuan
Variable name binding is a python thing, and Spark should not care how the
variable is named. What matters is the dependency graph. Spark fails to
handle this dependency graph correctly for which I am quite surprised: this
is just a simple combination of three very common sql operations.


On Tue, Apr 10, 2018 at 9:03 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi Shiyuan,
>
> I do not know whether I am right, but I would prefer to avoid expressions
> in Spark as:
>
> df = <>
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Apr 10, 2018 at 10:42 PM, Shiyuan <gshy2...@gmail.com> wrote:
>
>> Here is the pretty print of the physical plan which reveals some details
>> about what causes the bug (see the lines highlighted in bold):
>> WithColumnRenamed() fails to update the dependency graph correctly:
>>
>>
>> 'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
>> in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
>> with the same name appear in the operation: kk. Please check if the right
>> attribute(s) are used
>>
>> Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
>> +- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
>>:- Project [ID#64, score#67, LABEL#65, kk#73L]
>>:  +- Join Inner, (ID#64 = ID#99)
>>: :- Project [ID#64, score#67, LABEL#65, kk#73L]
>>: :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
>>: : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
>>: +- Project [ID#99]
>>:+- Filter (nL#90L > cast(1 as bigint))
>>:   +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
>> nL#90L]
>>:  +- Project [ID#99, score#102, LABEL#100, kk#73L]
>>: +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
>> score#102]
>>:+- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>+- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
>>   +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
>> count#118L]
>>  +- Project [ID#135, score#138, LABEL#136, kk#128L]
>> +- Join Inner, (ID#135 = ID#99)
>>:- Project [ID#135, score#138, LABEL#136, kk#128L]
>>:  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
>> score#138]*
>>: +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
>>+- Project [ID#99]
>>   +- Filter (nL#90L > cast(1 as bigint))
>>  +- Aggregate [ID#99], [ID#99, count(distinct
>> LABEL#100) AS nL#90L]
>> +- *!Project [ID#99, score#102, LABEL#100,
>> kk#128L]*
>>+-* Project [ID#99, LABEL#100, k#101L AS
>> kk#73L, score#102]*
>>   +- LogicalRDD [ID#99, LABEL#100, k#101L,
>> score#102]
>>
>> Here is the code which generates the error:
>>
>> import pyspark.sql.functions as F
>> from pyspark.sql import Row
>> df = spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=
>> 2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenam
>> ed("k","kk").select("ID","score","LABEL","kk")
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>>
>> On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:
>>
>>> The spark warning about Row instead of Dict is not the culprit. The
>>> problem still persists after I use Row instead of Dict to generate the
>>> dataframe.
>>>
>>> Here is the expain() output regarding the reassignment of df as Gourav
>>> suggests to run, They look the same except that  the serial numbers
>>> following the columns are different(eg. ID#7273 vs. ID#7344).
>>>
>>> this is the output of df.explain() after df =
>>> df.join(df_t.select("ID"),["ID"])
>>> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
>>> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
>>> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
>>> 200) : +- *(1) Proj

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-10 Thread Shiyuan
Here is the pretty print of the physical plan which reveals some details
about what causes the bug (see the lines highlighted in bold):
WithColumnRenamed() fails to update the dependency graph correctly:


'Resolved attribute(s) kk#144L missing from ID#118,LABEL#119,kk#96L,score#121
in operator !Project [ID#118, score#121, LABEL#119, kk#144L]. Attribute(s)
with the same name appear in the operation: kk. Please check if the right
attribute(s) are used

Project [ID#64, kk#73L, score#67, LABEL#65, cnt1#123L]
+- Join Inner, ((ID#64 = ID#135) && (kk#73L = kk#128L))
   :- Project [ID#64, score#67, LABEL#65, kk#73L]
   :  +- Join Inner, (ID#64 = ID#99)
   : :- Project [ID#64, score#67, LABEL#65, kk#73L]
   : :  +- Project [ID#64, LABEL#65, k#66L AS kk#73L, score#67]
   : : +- LogicalRDD [ID#64, LABEL#65, k#66L, score#67]
   : +- Project [ID#99]
   :+- Filter (nL#90L > cast(1 as bigint))
   :   +- Aggregate [ID#99], [ID#99, count(distinct LABEL#100) AS
nL#90L]
   :  +- Project [ID#99, score#102, LABEL#100, kk#73L]
   : +- Project [ID#99, LABEL#100, k#101L AS kk#73L,
score#102]
   :+- LogicalRDD [ID#99, LABEL#100, k#101L, score#102]
   +- Project [ID#135, kk#128L, count#118L AS cnt1#123L]
  +- Aggregate [ID#135, kk#128L], [ID#135, kk#128L, count(1) AS
count#118L]
 +- Project [ID#135, score#138, LABEL#136, kk#128L]
+- Join Inner, (ID#135 = ID#99)
   :- Project [ID#135, score#138, LABEL#136, kk#128L]
   :  +- *Project [ID#135, LABEL#136, k#137L AS kk#128L,
score#138]*
   : +- LogicalRDD [ID#135, LABEL#136, k#137L, score#138]
   +- Project [ID#99]
  +- Filter (nL#90L > cast(1 as bigint))
 +- Aggregate [ID#99], [ID#99, count(distinct
LABEL#100) AS nL#90L]
+- *!Project [ID#99, score#102, LABEL#100, kk#128L]*
   +-* Project [ID#99, LABEL#100, k#101L AS kk#73L,
score#102]*
  +- LogicalRDD [ID#99, LABEL#100, k#101L,
score#102]

Here is the code which generates the error:

import pyspark.sql.functions as F
from pyspark.sql import Row
df =
spark.createDataFrame([Row(score=1.0,ID='abc',LABEL=True,k=2),Row(score=1.0,ID='abc',LABEL=False,k=3)]).withColumnRenamed("k","kk").select("ID","score","LABEL","kk")
df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])


On Tue, Apr 10, 2018 at 1:37 PM, Shiyuan <gshy2...@gmail.com> wrote:

> The spark warning about Row instead of Dict is not the culprit. The
> problem still persists after I use Row instead of Dict to generate the
> dataframe.
>
> Here is the expain() output regarding the reassignment of df as Gourav
> suggests to run, They look the same except that  the serial numbers
> following the columns are different(eg. ID#7273 vs. ID#7344).
>
> this is the output of df.explain() after df = df.join(df_t.select("ID"),["
> ID"])
> == Physical Plan == *(6) Project [ID#7273, score#7276, LABEL#7274,
> kk#7281L] +- *(6) SortMergeJoin [ID#7273], [ID#7303], Inner :- *(2) Sort
> [ID#7273 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7273,
> 200) : +- *(1) Project [ID#7273, score#7276, LABEL#7274, k#7275L AS
> kk#7281L] : +- *(1) Filter isnotnull(ID#7273) : +- *(1) Scan
> ExistingRDD[ID#7273,LABEL#7274,k#7275L,score#7276] +- *(5) Sort [ID#7303
> ASC NULLS FIRST], false, 0 +- *(5) Project [ID#7303] +- *(5) Filter
> (nL#7295L > 1) +- *(5) HashAggregate(keys=[ID#7303],
> functions=[finalmerge_count(distinct merge count#7314L) AS
> count(LABEL#7304)#7294L]) +- Exchange hashpartitioning(ID#7303, 200) +-
> *(4) HashAggregate(keys=[ID#7303], functions=[partial_count(distinct
> LABEL#7304) AS count#7314L]) +- *(4) HashAggregate(keys=[ID#7303,
> LABEL#7304], functions=[]) +- Exchange hashpartitioning(ID#7303,
> LABEL#7304, 200) +- *(3) HashAggregate(keys=[ID#7303, LABEL#7304],
> functions=[]) +- *(3) Project [ID#7303, LABEL#7304] +- *(3) Filter
> isnotnull(ID#7303) +- *(3) Scan 
> ExistingRDD[ID#7303,LABEL#7304,k#7305L,score#7306]
>
>
> In comparison, this is the output of df1.explain() after  df1 =
> df.join(df_t.select("ID"),["ID"])?
> == Physical Plan == *(6) Project [ID#7344, score#7347, LABEL#7345,
> kk#7352L] +- *(6) SortMergeJoin [ID#7344], [ID#7374], Inner :- *(2) Sort
> [ID#7344 ASC NULLS FIRST], false, 0 : +- Exchange hashpartitioning(ID#7344,
> 200) : +- *(1) Project [ID#7344, sco

Re: A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-10 Thread Shiyuan
EL#151, k#152L,
score#153], false\n +- Project [ID#118]\n +- Filter (nL#110L > cast(1 as
bigint))\n +- Aggregate [ID#118], [ID#118, count(distinct LABEL#119) AS
nL#110L]\n +- !Project [ID#118, score#121, LABEL#119, kk#144L]\n +- Project
[ID#118, LABEL#119, k#120L AS kk#96L, score#121]\n +- LogicalRDD [ID#118,
LABEL#119, k#120L, score#121], false\n'




On Mon, Apr 9, 2018 at 3:21 PM, Gourav Sengupta <gourav.sengu...@gmail.com>
wrote:

> Hi,
>
> what I am curious about is the reassignment of df.
>
> Can you please look into the explain plan of df after the statement df =
> df.join(df_t.select("ID"),["ID"])? And then compare with the explain plan
> of df1 after the statement df1 = df.join(df_t.select("ID"),["ID"])?
>
> Its late here, but I am yet to go through this completely.  But I think
> that SPARK does throw a warning mentioning us to use Row instead of
> Dictionary.
>
> It will be of help if you could kindly try using the below statement and
> go through your used case once again (I am yet to go through all the lines):
>
>
>
> from pyspark.sql import Row
>
> df = spark.createDataFrame([Row(score = 1.0,ID="abc",LABEL=True,k=2),
> Row(score = 1.0,ID="abc",LABEL=True,k=3)])
>
> Regards,
> Gourav Sengupta
>
>
> On Mon, Apr 9, 2018 at 6:50 PM, Shiyuan <gshy2...@gmail.com> wrote:
>
>> Hi Spark Users,
>> The following code snippet has an "attribute missing" error while the
>> attribute exists.  This bug is  triggered by a particular sequence of of
>> "select", "groupby" and "join".  Note that if I take away the "select"  in
>> #line B,  the code runs without error.   However, the "select" in #line B
>> includes all columns in the dataframe and hence should  not affect the
>> final result.
>>
>>
>> import pyspark.sql.functions as F
>> df = spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,
>> 'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])
>>
>> df = df.withColumnRenamed("k","kk")\
>>   .select("ID","score","LABEL","kk")#line B
>>
>> df_t = df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).
>> filter(F.col("nL")>1)
>> df = df.join(df_t.select("ID"),["ID"])
>> df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count",
>> "cnt1")
>> df = df.join(df_sw, ["ID","kk"])
>>
>
>


A bug triggered by a particular sequence of "select", "groupby" and "join" in Spark 2.3.0

2018-04-09 Thread Shiyuan
Hi Spark Users,
The following code snippet has an "attribute missing" error while the
attribute exists.  This bug is  triggered by a particular sequence of of
"select", "groupby" and "join".  Note that if I take away the "select"  in
#line B,  the code runs without error.   However, the "select" in #line B
includes all columns in the dataframe and hence should  not affect the
final result.


import pyspark.sql.functions as F
df =
spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])

df = df.withColumnRenamed("k","kk")\
  .select("ID","score","LABEL","kk")#line B

df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])


Uncaught exception in thread heartbeat-receiver-event-loop-thread

2018-04-02 Thread Shiyuan
Hi,
I got an error of Uncaught exception in
thread heartbeat-receiver-event-loop-thread.  Does this error indicate that
some node is  too overloaded to be responsive?  Thanks!

ERROR Utils: Uncaught exception in thread
heartbeat-receiver-event-loop-thread
java.lang.NullPointerException
at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:464)
at
org.apache.spark.util.CollectionAccumulator.value(AccumulatorV2.scala:439)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6$$anonfun$7.apply(TaskSchedulerImpl.scala:408)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at
scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:408)
at
org.apache.spark.scheduler.TaskSchedulerImpl$$anonfun$6.apply(TaskSchedulerImpl.scala:407)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at
org.apache.spark.scheduler.TaskSchedulerImpl.executorHeartbeatReceived(TaskSchedulerImpl.scala:407)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2$$anonfun$run$2.apply$mcV$sp(HeartbeatReceiver.scala:129)
at
org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1295)
at
org.apache.spark.HeartbeatReceiver$$anonfun$receiveAndReply$1$$anon$2.run(HeartbeatReceiver.scala:128)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Re: strange behavior of joining dataframes

2018-03-23 Thread Shiyuan
Here is a simple example that reproduces the problem. This code has a
missing attribute('kk') error.  Is it  a bug? Note that if the `select`
in line B is removed, this code would run.

import pyspark.sql.functions as F
df =
spark.createDataFrame([{'score':1.0,'ID':'abc','LABEL':True,'k':2},{'score':1.0,'ID':'abc','LABEL':False,'k':3}])

df = df.withColumnRenamed("k","kk")\
  .select("ID","score","LABEL","kk")#line B

df_t =
df.groupby("ID").agg(F.countDistinct("LABEL").alias("nL")).filter(F.col("nL")>1)
df = df.join(df_t.select("ID"),["ID"])
df_sw = df.groupby(["ID","kk"]).count().withColumnRenamed("count", "cnt1")
df = df.join(df_sw, ["ID","kk"])

On Tue, Mar 20, 2018 at 9:58 PM, Shiyuan <gshy2...@gmail.com> wrote:

> Hi Spark-users:
> I have a dataframe "df_t" which was generated from other dataframes by
> several transformations. And then I  did something very simple,  just
> counting the rows, that is the following code:
>
> (A)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t_3 = df_t_1.join(df_t_2, ["Id"])
> df_t.join(df_t_3, ["Id","key"])
>
> When I run this query, I got the error that  "key" is missing during
> joining. However, the column "key" is clearly in the dataframe dt.  What is
> strange is that: if I first do this:
>
>  data = df_t.collect(); df_t = spark.createDataFrame(data);  (B)
>
> then (A) can run without error.  However,  the code (B) should not change
> the dataframe dt_t at all.  Why the snippet (A) can run with (B) but
> failed without (B)?  Also, A different joining sequence can also complete
> without error:
>
> (C)
> df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
> "cnt1")
> df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
> df_t.join(df_t_1, ["Id","key"]).join(df_t_2, ["Id"])
>
> But (A) and (C) are conceptually the same and  should produce the same
> result.  What could possibly go wrong here?  Any hints to track down
> the problem is appreciated.  I am using spark 2.1.
>
>
>
>
>


strange behavior of joining dataframes

2018-03-20 Thread Shiyuan
Hi Spark-users:
I have a dataframe "df_t" which was generated from other dataframes by
several transformations. And then I  did something very simple,  just
counting the rows, that is the following code:

(A)
df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
"cnt1")
df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
df_t_3 = df_t_1.join(df_t_2, ["Id"])
df_t.join(df_t_3, ["Id","key"])

When I run this query, I got the error that  "key" is missing during
joining. However, the column "key" is clearly in the dataframe dt.  What is
strange is that: if I first do this:

 data = df_t.collect(); df_t = spark.createDataFrame(data);  (B)

then (A) can run without error.  However,  the code (B) should not change
the dataframe dt_t at all.  Why the snippet (A) can run with (B) but
failed without (B)?  Also, A different joining sequence can also complete
without error:

(C)
df_t_1 =  df_t.groupby(["Id","key"]).count().withColumnRenamed("count",
"cnt1")
df_t_2 = df_t.groupby("Id").count().withColumnRenamed("count", "cnt2")
df_t.join(df_t_1, ["Id","key"]).join(df_t_2, ["Id"])

But (A) and (C) are conceptually the same and  should produce the same
result.  What could possibly go wrong here?  Any hints to track down
the problem is appreciated.  I am using spark 2.1.


Insufficient memory for Java Runtime

2018-03-13 Thread Shiyuan
Hi Spark-Users,
  I encountered the problem of "insufficient memory". The error is logged
in the file with a name " hs_err_pid86252.log"(attached in the end of this
email).

I launched the spark job by " spark-submit --driver-memory 40g --master
yarn --deploy-mode client".  The spark session was created with 10
executors each with 60g memory. The data access pattern is pretty simple, I
keep reading some spark dataframe from hdfs one by one, filter, join with
another dataframe,  and then append the results to an dataframe:
for i= 1,2,3
df1 = spark.read.parquet(file_i)
df_r = df1.filter(...). join(df2)
df_all = df_all.union(df_r)

each file_i is quite small, only a few GB, but there are a lot of such
files. after filtering and join, each df_r is also quite small. When the
program failed, df_all had only 10k rows which should be around 10GB.  Each
machine in the cluster has round 80GB memory and 1TB disk space and  only
one user was using the cluster when it failed due to insufficient memory.
My questions are:
i).  The log file showed that it failed to allocate 8G committing memory.
But how could that happen when the driver and executors have more than 40g
free memory. In fact, only transformations but no actions had run when the
program failed.  As I understand, only DAG and book-keeping work is done
during dataframe transformation, no data is brought into the memory.  Why
spark still tries to allocate such large memory?
ii). Could manually running garbage collection help?
iii). Did I mis-specify some runtime parameter for jvm, yarn, or spark?


Any help or references are appreciated!

The content of hs_err_pid86252,log:

# There is insufficient memory for the Java Runtime Environment to continue.
# Native memory allocation (mmap) failed to map 8663334912 bytes(~8G) for
committing reserved memory.
# Possible reasons:
#   The system is out of physical RAM or swap space
#   In 32 bit mode, the process size limit was hit
# Possible solutions:
#   Reduce memory load on the system
#   Increase physical memory or swap space
#   Check if swap backing store is full
#   Use 64 bit Java on a 64 bit OS
#   Decrease Java heap size (-Xmx/-Xms)
#   Decrease number of Java threads
#   Decrease Java thread stack sizes (-Xss)
#   Set larger code cache with -XX:ReservedCodeCacheSize=
# This output file may be truncated or incomplete.
#
#  Out of Memory Error (os_linux.cpp:2643), pid=86252,
tid=0x7fd69e683700
#
# JRE version: OpenJDK Runtime Environment (8.0_151-b12) (build
1.8.0_151-8u151-b12-0ubuntu0.16.04.2-b12)
# Java VM: OpenJDK 64-Bit Server VM (25.151-b12 mixed mode linux-amd64 )
# Failed to write core dump. Core dumps have been disabled. To enable core
dumping, try "ulimit -c unlimited" before starting Java again
#

---  T H R E A D  ---

Current thread (0x7fe0bc08c000):  VMThread [stack:
0x7fd69e583000,0x7fd69e684000] [id=86295]


Re: Why dataframe can be more efficient than dataset?

2017-04-09 Thread Shiyuan
Thank you for the detailed explanation!  You point out two reasons why
Dataset is not as efficeint as dataframe:
1). Spark cannot look into lambda and therefore cannot optimize.
2). The  type conversion  occurs under the hood, eg. from X to internal
row.

Just to check my understanding,  some method of Dataset can also take sql
expression string  instead of lambda function, in this case, Is it  the
type conversion still happens under the hood and therefore Dataset is still
not as efficient as DataFrame.  Here is the code,

//define a dataset and a dataframe, same content, but one is stored as
Dataset, the other is Dataset
scala> case class Person(name: String, age: Long)
scala> val ds = Seq(Person("A",32), Person("B", 18)).toDS
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
scala> val df = Seq(Person("A",32), Person("B", 18)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

//Which filtering is more efficient? both use sql expression string.
scala> df.filter("age < 20")
res7: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name:
string, age: bigint]

scala> ds.filter("age < 20")
res8: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]








On Sat, Apr 8, 2017 at 7:22 PM, Koert Kuipers <ko...@tresata.com> wrote:

> how would you use only relational transformations on dataset?
>
> On Sat, Apr 8, 2017 at 2:15 PM, Shiyuan <gshy2...@gmail.com> wrote:
>
>> Hi Spark-users,
>> I came across a few sources which mentioned DataFrame can be more
>> efficient than Dataset.  I can understand this is true because Dataset
>> allows functional transformation which Catalyst cannot look into and hence
>> cannot optimize well. But can DataFrame be more efficient than Dataset even
>> if we only use the relational transformation on dataset? If so, can anyone
>> give some explanation why  it is so? Any benchmark comparing dataset vs.
>> dataframe?   Thank you!
>>
>> Shiyuan
>>
>
>


Why dataframe can be more efficient than dataset?

2017-04-08 Thread Shiyuan
Hi Spark-users,
I came across a few sources which mentioned DataFrame can be more
efficient than Dataset.  I can understand this is true because Dataset
allows functional transformation which Catalyst cannot look into and hence
cannot optimize well. But can DataFrame be more efficient than Dataset even
if we only use the relational transformation on dataset? If so, can anyone
give some explanation why  it is so? Any benchmark comparing dataset vs.
dataframe?   Thank you!

Shiyuan


best practice for paralleling model training

2017-01-24 Thread Shiyuan
Hi spark users,
I am looking for a way to paralleling #A and #B in the code below.   Since
dataframe in spark is immutable,  #A and #B are completely separated
operations

My question is:
1). As for spark 2.1,  #B only starts when #A is completed.  Is it right?
2).  What's the best way to parallelize #A and #B given infinite number of
computing nodes?

Any explanations or pointers are appreciated!


df = spark.createDataframe(...)

model1 = pipeline1.fit(df)   #A
modle2 = pipeline2.fit(df)  #B


Why StringIndexer uses double instead of int for indexing?

2017-01-21 Thread Shiyuan
Hi Spark,
StringIndex uses double instead of int for indexing
http://spark.apache.org/docs/latest/ml-features.html#stringindexer. What's
the rationale for using double to index? Would it be more appropriate to
use int to index (which is consistent with other place like Vector.sparse)

Shiyuan