Re: Live Streamed Code Review today at 11am Pacific

2018-07-19 Thread Holden Karau
Heads up tomorrows Friday review is going to be at 8:30 am instead of 9:30
am because I had to move some flights around.

On Fri, Jul 13, 2018 at 12:03 PM, Holden Karau  wrote:

> This afternoon @ 3pm pacific I'll be looking at review tooling for Spark &
> Beam https://www.youtube.com/watch?v=ff8_jbzC8JI.
>
> Next week's regular Friday code (this time July 20th @ 9:30am pacific)
> review will once again probably have more of an ML focus for folks
> interested in watching Spark ML PRs be reviewed - https://www.youtube.com/
> watch?v=aG5h99yb6XE 
>
> Next week I'll have a live coding session with more of a Beam focus if you
> want to see something a bit different (but still related since Beam runs on
> Spark) with a focus on Python dependency management (which is a thing we
> are also exploring in Spark at the same time) - https://www.youtube.com/
> watch?v=Sv0XhS2pYqA on July 19th at 2pm pacific.
>
> P.S.
>
> You can follow more generally me holdenkarau on YouTube
> 
> and holdenkarau on Twitch  to be
> notified even when I forget to send out the emails (which is pretty often).
>
> This morning I did another live review session I forgot to ping to the
> list about ( https://www.youtube.com/watch?v=M_lRFptcGTI&list=
> PLRLebp9QyZtYF46jlSnIu2x1NDBkKa2uw&index=31 ) and yesterday I did some
> live coding using PySpark and working on Sparkling ML -
> https://www.youtube.com/watch?v=kCnBDpNce9A&list=
> PLRLebp9QyZtYF46jlSnIu2x1NDBkKa2uw&index=32
>
> On Wed, Jun 27, 2018 at 10:44 AM, Holden Karau 
> wrote:
>
>> Today @ 1:30pm pacific I'll be looking at the current Spark 2.1.3 RC and
>> see how we validate Spark releases - https://www.twitch.tv/events/V
>> Ag-5PKURQeH15UAawhBtw / https://www.youtube.com/watch?v=1_XLrlKS26o .
>> Tomorrow @ 12:30 live PR reviews & Monday live coding -
>> https://youtube.com/user/holdenkarau & https://www.twitch.tv/holdenka
>> rau/events . Hopefully this can encourage more folks to help with RC
>> validation & PR reviews :)
>>
>> On Thu, Jun 14, 2018 at 6:07 AM, Holden Karau 
>> wrote:
>>
>>> Next week is pride in San Francisco but I'm still going to do two quick
>>> session. One will be live coding with Apache Spark to collect ASF diversity
>>> information ( https://www.youtube.com/watch?v=OirnFnsU37A /
>>> https://www.twitch.tv/events/O1edDMkTRBGy0I0RCK-Afg ) on Monday at 9am
>>> pacific and the other will be the regular Friday code review (
>>> https://www.youtube.com/watch?v=IAWm4OLRoyY / https://www.tw
>>> itch.tv/events/v0qzXxnNQ_K7a8JYFsIiKQ ) also at 9am.
>>>
>>> On Thu, Jun 7, 2018 at 9:10 PM, Holden Karau 
>>> wrote:
>>>
 I'll be doing another one tomorrow morning at 9am pacific focused on
 Python + K8s support & improved JSON support -
 https://www.youtube.com/watch?v=Z7ZEkvNwneU &
 https://www.twitch.tv/events/xU90q9RGRGSOgp2LoNsf6A :)

 On Fri, Mar 9, 2018 at 3:54 PM, Holden Karau 
 wrote:

> If anyone wants to watch the recording: https://www.youtube
> .com/watch?v=lugG_2QU6YU
>
> I'll do one next week as well - March 16th @ 11am -
> https://www.youtube.com/watch?v=pXzVtEUjrLc
>
> On Fri, Mar 9, 2018 at 9:28 AM, Holden Karau 
> wrote:
>
>> Hi folks,
>>
>> If your curious in learning more about how Spark is developed, I’m
>> going to expirement doing a live code review where folks can watch and 
>> see
>> how that part of our process works. I have two volunteers already for
>> having their PRs looked at live, and if you have a Spark PR your working 
>> on
>> you’d like me to livestream a review of please ping me.
>>
>> The livestream will be at https://www.youtube.com/watch?v=lugG_2QU6YU
>> .
>>
>> Cheers,
>>
>> Holden :)
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>



 --
 Twitter: https://twitter.com/holdenkarau

>>>
>>>
>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>
>>
>>
>> --
>> Twitter: https://twitter.com/holdenkarau
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>



-- 
Twitter: https://twitter.com/holdenkarau


Re: Arrow type issue with Pandas UDF

2018-07-19 Thread Bryan Cutler
Hi Patrick,

It looks like it's failing in Scala before it even gets to Python to
execute your udf, which is why it doesn't seem to matter what's in your
udf. Since you are doing a grouped map udf maybe your group sizes are too
big or skewed? Could you try to reduce the size of your groups by adding
more keys or sampling a fraction of the data? If the problem persists could
you make a jira? At the very least a better exception would be nice.

Bryan

On Thu, Jul 19, 2018, 7:07 AM Patrick McCarthy
 wrote:

> PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.
>
> I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions
> in the last stage of the job regardless of my output type.
>
>
> The problem I'm trying to solve:
> I have a column of scalar values, and each value on the same row has a
> sorted vector. I'm trying to replace each scalar value with its closest
> index from its vector. I'm applying the grouping arbitrarily and performing
> a python operation row-wise because even when the same vector appears on
> many rows it's not clear how I would get the lookup to scale.
>
> My input data, the product of a join of hive tables, has the following
> schema:
>
> root
>  |-- scalar_value: float (nullable = true)
>  |-- quantilelist: array (nullable = true)
>  ||-- element: double (containsNull = true)
>
>
> My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
> an operation on two columns, and because I want to take advantage of Arrow
> to avoid serialization.
>
> The schema my UDF returns is this:
>
> pos_schema = T.StructType([
> T.StructField('feature_value',T.FloatType(),True),
> T.StructField('error',T.StringType())
> ])
>
> ...however when I try to apply my UDF, either with saveAsTable or show(),
> I get the following exception:
>
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
> at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
> at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
> at
> org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
> at
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
> at
> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
> at
> org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)
>
> I assumed it was the result of some bad typing on my part, until I did a
> test with a degenerate UDF that only returns a column of 1:
>
> @F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),
>
> F.PandasUDFType.GROUPED_MAP)
>
> def groupedPercentileInt(df):
>
> return
> pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)
>
>
> This clearly only has one return value of type int, yet I get the same
> exception:
>
> org.apache.arrow.vector.util.OversizedAllocationException: Unable to
> expand the buffer
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
> at
> org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
> at
> org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
> at
> org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
> at
> org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
> at
> org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
> 

Re: Mulitple joins with same Dataframe throws AnalysisException: resolved attribute(s)

2018-07-19 Thread Joel D
One workaround is to rename the fid  column for each df before joining.

On Thu, Jul 19, 2018 at 9:50 PM  wrote:

> Spark 2.3.0 has this problem upgrade it to 2.3.1
>
> Sent from my iPhone
>
> On Jul 19, 2018, at 2:13 PM, Nirav Patel  wrote:
>
> corrected subject line. It's missing attribute error not ambiguous
> reference error.
>
> On Thu, Jul 19, 2018 at 2:11 PM, Nirav Patel 
> wrote:
>
>> I am getting attribute missing error after joining dataframe 'df2' twice .
>>
>> Exception in thread "main" org.apache.spark.sql.AnalysisException:
>> resolved attribute(s) *fid#49 *missing from
>> value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,
>> *fId#127* in operator !Join LeftOuter, (mgrId#15 = fid#49);;
>>
>> !Join LeftOuter, (mgrId#15 = fid#49)
>>
>> :- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS
>> d31#109]
>>
>> :  +- Join Inner, (df1Id#13 = fid#49)
>>
>> : :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15,
>> _4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]
>>
>> : :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
>>
>> : +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49,
>> _4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]
>>
>> :+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>>
>> +- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127,
>> _4#43 AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]
>>
>>+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>>
>>
>> at
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(
>> CheckAnalysis.scala:40)
>>
>> at
>>
>>
>> As you can see "fid" is present but spark is looking for fid#49 while
>> there is another one fid#127.
>>
>> Physical Plan of original df2 is
>>
>> == Physical Plan ==
>>
>> LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]
>>
>>
>> But by looking at physical plan looks like there are multiple versions of
>> 'fid' gets generated (fid#49, fid#127).
>>
>> Here's the full code.
>>
>>
>> Code:
>>
>> val seq1 = Seq(
>>
>> (1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>>
>> (3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>>
>> (4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>>
>> //val rdd1 = spark.sparkContext.parallelize(seq1)
>>
>> val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")
>>
>> df1.show()
>>
>>
>>
>> val seq2 = Seq(
>>
>> (1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>>
>> (3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>>
>> (4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>>
>> (6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>>
>>
>>
>>
>>
>> val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")
>>
>> df2.explain()
>>
>> df2.show()
>>
>>
>>
>> val join1 = df1
>>
>>   .join(df2,
>>
>> df1("id") === df2("fid"))
>>
>>   .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2(
>> "id").as("df2id"), df2("fid"), df2("value"))
>>
>> join1.printSchema()
>>
>> join1.show()
>>
>>
>>
>> val join2 = join1
>>
>>   .join(df2,
>>
>>   join1("mgrId") === df2("fid"),
>>
>>   "left")
>>
>>.select(join1("id"), join1("value"), join1("mgrId"), join1("name"),
>> join1("df2id"),
>>
>>join1("fid"), df2("fid").as("df2fid"))
>>
>> join2.printSchema()
>>
>> join2.show()
>>
>>
>>
>>
>>
>>
>>
>
>
>
> [image: What's New with Xactly] 
>
> 
> 
>    
> 
>
>


Re: Mulitple joins with same Dataframe throws AnalysisException: resolved attribute(s)

2018-07-19 Thread kanth909
Spark 2.3.0 has this problem upgrade it to 2.3.1 

Sent from my iPhone

> On Jul 19, 2018, at 2:13 PM, Nirav Patel  wrote:
> 
> corrected subject line. It's missing attribute error not ambiguous reference 
> error.
> 
>> On Thu, Jul 19, 2018 at 2:11 PM, Nirav Patel  wrote:
>> I am getting attribute missing error after joining dataframe 'df2' twice .
>> 
>> Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved 
>> attribute(s) fid#49 missing from 
>> value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,fId#127
>>  in operator !Join LeftOuter, (mgrId#15 = fid#49);;
>> !Join LeftOuter, (mgrId#15 = fid#49)
>> :- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS 
>> d31#109]
>> :  +- Join Inner, (df1Id#13 = fid#49)
>> : :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15, _4#9 
>> AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]
>> : :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
>> : +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49, 
>> _4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]
>> :+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>> +- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127, _4#43 
>> AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]
>>+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>> 
>>  at 
>> org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>>  at 
>> 
>> 
>> As you can see "fid" is present but spark is looking for fid#49 while there 
>> is another one fid#127.
>> 
>> Physical Plan of original df2 is 
>> == Physical Plan ==
>> LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]
>> 
>> But by looking at physical plan looks like there are multiple versions of 
>> 'fid' gets generated (fid#49, fid#127). 
>> 
>> Here's the full code.
>> 
>> 
>> Code:
>> 
>> val seq1 = Seq(
>> (1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>> (2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>> (3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>> (4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>> (5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>> (6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>> //val rdd1 = spark.sparkContext.parallelize(seq1)
>> val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")
>> df1.show()
>> 
>> val seq2 = Seq(
>> (1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>> (2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>> (3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>> (4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>> (5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>> (6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>> 
>> 
>> val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")
>> df2.explain()
>> df2.show()
>> 
>> val join1 = df1
>>   .join(df2,
>> df1("id") === df2("fid"))
>>   .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), 
>> df2("id").as("df2id"), df2("fid"), df2("value"))
>> join1.printSchema()  
>> join1.show()
>> 
>> val join2 = join1
>>   .join(df2,
>>   join1("mgrId") === df2("fid"),
>>   "left")
>>.select(join1("id"), join1("value"), join1("mgrId"), join1("name"), 
>> join1("df2id"), 
>>join1("fid"), df2("fid").as("df2fid"))   
>> join2.printSchema()  
>> join2.show()  
>> 
>> 
>> 
>> 
>> 
>> 
> 
> 
> 
> 
> 
> 
> 


Parquet

2018-07-19 Thread amin mohebbi
We do have two big tables each includes 5 billion of rows, so my question here 
is should we partition /sort the data and convert it to Parquet before doing 
any join?
Best Regards ... Amin 
Mohebbi PhD candidate in Software Engineering   at university of Malaysia   Tel 
: +60 18 2040 017 E-Mail : tp025...@ex.apiit.edu.my   
amin_...@me.com

Re: Mulitple joins with same Dataframe throws AnalysisException: resolved attribute(s)

2018-07-19 Thread Nirav Patel
corrected subject line. It's missing attribute error not ambiguous
reference error.

On Thu, Jul 19, 2018 at 2:11 PM, Nirav Patel  wrote:

> I am getting attribute missing error after joining dataframe 'df2' twice .
>
> Exception in thread "main" org.apache.spark.sql.AnalysisException:
> resolved attribute(s) *fid#49 *missing from value#14,value#126,mgrId#15,
> name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,
> *fId#127* in operator !Join LeftOuter, (mgrId#15 = fid#49);;
>
> !Join LeftOuter, (mgrId#15 = fid#49)
>
> :- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS
> d31#109]
>
> :  +- Join Inner, (df1Id#13 = fid#49)
>
> : :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15,
> _4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]
>
> : :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]
>
> : +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49,
> _4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]
>
> :+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>
> +- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127,
> _4#43 AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]
>
>+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]
>
>
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.
> failAnalysis(CheckAnalysis.scala:40)
>
> at
>
>
> As you can see "fid" is present but spark is looking for fid#49 while
> there is another one fid#127.
>
> Physical Plan of original df2 is
>
> == Physical Plan ==
>
> LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]
>
>
> But by looking at physical plan looks like there are multiple versions of
> 'fid' gets generated (fid#49, fid#127).
>
> Here's the full code.
>
>
> Code:
>
> val seq1 = Seq(
>
> (1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>
> (3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>
> (4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>
> //val rdd1 = spark.sparkContext.parallelize(seq1)
>
> val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")
>
> df1.show()
>
>
>
> val seq2 = Seq(
>
> (1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),
>
> (3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),
>
> (4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),
>
> (6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))
>
>
>
>
>
> val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")
>
> df2.explain()
>
> df2.show()
>
>
>
> val join1 = df1
>
>   .join(df2,
>
> df1("id") === df2("fid"))
>
>   .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2("id"
> ).as("df2id"), df2("fid"), df2("value"))
>
> join1.printSchema()
>
> join1.show()
>
>
>
> val join2 = join1
>
>   .join(df2,
>
>   join1("mgrId") === df2("fid"),
>
>   "left")
>
>.select(join1("id"), join1("value"), join1("mgrId"), join1("name"),
> join1("df2id"),
>
>join1("fid"), df2("fid").as("df2fid"))
>
> join2.printSchema()
>
> join2.show()
>
>
>
>
>
>
>

-- 


 

 
   
   
      



Mulitple joins with same Dataframe throws Ambiguous reference error

2018-07-19 Thread Nirav Patel
I am getting attribute missing error after joining dataframe 'df2' twice .

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved
attribute(s) *fid#49 *missing from
value#14,value#126,mgrId#15,name#16,d31#109,df2Id#125,df2Id#47,d4#130,d3#129,df1Id#13,name#128,
*fId#127* in operator !Join LeftOuter, (mgrId#15 = fid#49);;

!Join LeftOuter, (mgrId#15 = fid#49)

:- Project [df1Id#13, value#14, mgrId#15, name#16, df2Id#47, d3#51 AS
d31#109]

:  +- Join Inner, (df1Id#13 = fid#49)

: :- Project [_1#6 AS df1Id#13, _2#7 AS value#14, _3#8 AS mgrId#15,
_4#9 AS name#16, _5#10 AS d1#17, _6#11 AS d2#18]

: :  +- LocalRelation [_1#6, _2#7, _3#8, _4#9, _5#10, _6#11]

: +- Project [_1#40 AS df2Id#47, _2#41 AS value#48, _3#42 AS fId#49,
_4#43 AS name#50, _5#44 AS d3#51, _6#45 AS d4#52]

:+- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]

+- Project [_1#40 AS df2Id#125, _2#41 AS value#126, _3#42 AS fId#127, _4#43
AS name#128, _5#44 AS d3#129, _6#45 AS d4#130]

   +- LocalRelation [_1#40, _2#41, _3#42, _4#43, _5#44, _6#45]


at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(
CheckAnalysis.scala:40)

at


As you can see "fid" is present but spark is looking for fid#49 while there
is another one fid#127.

Physical Plan of original df2 is

== Physical Plan ==

LocalTableScan [df2Id#47, value#48, fId#49, name#50, d3#51, d4#52]


But by looking at physical plan looks like there are multiple versions of
'fid' gets generated (fid#49, fid#127).

Here's the full code.


Code:

val seq1 = Seq(

(1,"a",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(2,"a",0,"bla", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),

(3,"a",2,"bla", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),

(4,"bb",1,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(5,"bb",2,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(6,"bb",0,"bla", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))

//val rdd1 = spark.sparkContext.parallelize(seq1)

val df1= seq1.toDF("id","value","mgrId", "name", "d1", "d2")

df1.show()



val seq2 = Seq(

(1,"a1",1,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(2,"a2",1,"duh", "2014-01-01 00:00:00", "2014-09-12 18:55:43"),

(3,"a3",2,"jah", "2000-12-01 00:00:00", "2000-01-01 00:00:00"),

(4,"a4",3,"duh", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(5,"a5",4,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"),

(6,"a6",5,"jah", "2014-01-01 00:00:00", "2014-01-01 00:00:00"))





val df2 = seq2.toDF("id","value","fId", "name", "d1", "d2")

df2.explain()

df2.show()



val join1 = df1

  .join(df2,

df1("id") === df2("fid"))

  .select(df1("id"), df1("value"), df1("mgrId"), df1("name"), df2("id").
as("df2id"), df2("fid"), df2("value"))

join1.printSchema()

join1.show()



val join2 = join1

  .join(df2,

  join1("mgrId") === df2("fid"),

  "left")

   .select(join1("id"), join1("value"), join1("mgrId"), join1("name"),
join1("df2id"),

   join1("fid"), df2("fid").as("df2fid"))

join2.printSchema()

join2.show()

-- 


 

 
   
   
      



Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

2018-07-19 Thread Christiaan Ras
Hi Gerard,

First, I like to thank you for your fast reply and for directing my question to 
the proper mailinglist!
I established the JDBC connection in the context of the state function 
(flatMapGroupsWithState). The JDBC connection is made by using the read in the 
SparkSession. Like below:
spark.read
.format("jdbc")
.option("url", 
s"jdbc:postgresql://${connection.host}/${connection.db}")
.option("dbtable", s"table")
.option("user", connection.user)
.option("password", connection.pwd)
.option("driver", "org.postgresql.Driver")
.option("numPartitions", "5")
.load()
.as(Encoders.product[Class])

I use a ‘shared’ SparkSession, initialized in main (run by Driver, I guess), 
but made accessible by Singleton to other classes. The class which handles the 
state function fetches the shared session from this singleton.
My test ran local with a single thread, so all logging should be visible on the 
console.

BTW: I now implemented the approach to join this datasource with the streaming 
source before feeding it the state function. That works! But I am still curious 
how to do this in flatmapgroupswithstate? Or that the state functions have not 
been designed to do such things…

Regards,

Chris

From: Gerard Maas 
Date: Thursday, 19 July 2018 at 15:20
To: Christiaan Ras , spark users 

Subject: Re: [STRUCTURED STREAM] Join static dataset in state function 
(flatMapGroupsWithState)

Hi Chris,

Could you show the code you are using? When you mention "I like to use a static 
datasource (JDBC) in the state function" do you refer to a DataFrame from a 
JDBC source or an independent JDBC connection?

The key point to consider is that the flatMapGroupsWithState function must be 
serializable. Its execution happens in the workers of a Spark job.

If you are using a JDBC connection, you need to make sure the connection is 
made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are 
defined. They are bound to the Spark Session in the driver and it does not make 
sense to access them in a remote executor.

Make sure you check the executor logs as well. There might be a 
NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev (d...@spark.apache.org) is for 
discussions about open source development of the Spark project.
For general questions like this, use the user's  mailing list 
(user@spark.apache.org)  (note that I changed 
that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras 
mailto:christiaan@semmelwise.nl>> wrote:
I use the state function flatmapgroupswithstate to track state of a kafka 
stream. To further customize the state function I like to use a static 
datasource (JDBC) in the state function. This datasource contains data I like 
to join with the stream (as Iterator) within flatmapgroupswithstate.

When I try to access the JDBC source within flatmapgroupswithstate Spark 
execution freezes without any Exceptions or logging.
To verify the JDBC connection works, I also tried to access the source outside 
the state function and that works. So now I join the static source with 
streaming source before feeding it to flatmapgroupswithstate. It seems to work 
so far…

Any ideas why accessing the JDBC source within flatmapgroupswithstate could 
fail (freezes Spark execution)? Is it wise to use external datasources within 
flatmapgroupswithstate?

Thanks,
Chris




Compute /Storage Calculation

2018-07-19 Thread Deepu Raj
Hi Team  - Any good calculator/Excel to estimate compute and storage 
requirements for the new spark jobs to be developed.


Capacity planning based on:-

Job, Data type etc


Thanks,

Deepu Raj


Arrow type issue with Pandas UDF

2018-07-19 Thread Patrick McCarthy
PySpark 2.3.1 on YARN, Python 3.6, PyArrow 0.8.

I'm trying to run a pandas UDF, but I seem to get nonsensical exceptions in
the last stage of the job regardless of my output type.


The problem I'm trying to solve:
I have a column of scalar values, and each value on the same row has a
sorted vector. I'm trying to replace each scalar value with its closest
index from its vector. I'm applying the grouping arbitrarily and performing
a python operation row-wise because even when the same vector appears on
many rows it's not clear how I would get the lookup to scale.

My input data, the product of a join of hive tables, has the following
schema:

root
 |-- scalar_value: float (nullable = true)
 |-- quantilelist: array (nullable = true)
 ||-- element: double (containsNull = true)


My UDF is at bottom. I'm using a GROUPED_MAP UDF because I want to perform
an operation on two columns, and because I want to take advantage of Arrow
to avoid serialization.

The schema my UDF returns is this:

pos_schema = T.StructType([
T.StructField('feature_value',T.FloatType(),True),
T.StructField('error',T.StringType())
])

...however when I try to apply my UDF, either with saveAsTable or show(), I
get the following exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215)
at
org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991)
at
org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170)

I assumed it was the result of some bad typing on my part, until I did a
test with a degenerate UDF that only returns a column of 1:

@F.pandas_udf(T.StructType([T.StructField('feature_value',T.IntegerType(),True)]),

F.PandasUDFType.GROUPED_MAP)

def groupedPercentileInt(df):

return
pd.DataFrame({'feature_value':[int(1)]*df.shape[0]}).reset_index(drop=True)


This clearly only has one return value of type int, yet I get the same
exception:

org.apache.arrow.vector.util.OversizedAllocationException: Unable to expand
the buffer
at
org.apache.arrow.vector.BaseFixedWidthVector.reallocBufferHelper(BaseFixedWidthVector.java:447)
at
org.apache.arrow.vector.BaseFixedWidthVector.reAlloc(BaseFixedWidthVector.java:426)
at
org.apache.arrow.vector.BaseFixedWidthVector.handleSafe(BaseFixedWidthVector.java:838)
at
org.apache.arrow.vector.Float8Vector.setSafe(Float8Vector.java:221)
at
org.apache.spark.sql.execution.arrow.DoubleWriter.setValue(ArrowWriter.scala:223)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrayWriter.setValue(ArrowWriter.scala:308)
at
org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122)
at
org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at
org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380)
at
org.

[Spark Structured Streaming on K8S]: Debug - File handles/descriptor (unix pipe) leaking

2018-07-19 Thread Abhishek Tripathi
Hello All!​​
I am using spark 2.3.1 on kubernetes to run a structured streaming spark
job which read stream from Kafka , perform some window aggregation and
output sink to Kafka.
After job running few hours(5-6 hours), the executor pods is getting
crashed which is caused by "Too many open files in system".
Digging in further, with "lsof" command, I can see there is a lot UNIX pipe
getting opened.

# lsof -p 14 | tail
java 14 root *112u  a_inode   0,100  8838
[eventpoll]
java 14 root *113r FIFO0,9  0t0 252556158 pipe
java 14 root *114w FIFO0,9  0t0 252556158 pipe
java 14 root *115u  a_inode   0,100  8838
[eventpoll]
java 14 root *119r FIFO0,9  0t0 252552868 pipe
java 14 root *120w FIFO0,9  0t0 252552868 pipe
java 14 root *121u  a_inode   0,100  8838
[eventpoll]
java 14 root *131r FIFO0,9  0t0 252561014 pipe
java 14 root *132w FIFO0,9  0t0 252561014 pipe
java 14 root *133u  a_inode   0,100  8838
[eventpoll]

Total count of open fd is going up to 85K (increased hard ulimit) for each
pod and once it hit the hard limit , executor pod is getting crashed.
For shuffling I can think of it need more fd but in my case open fd count
keep growing forever. Not sure how can I estimate how many fd will be
adequate or there is a bug.
With that uncertainty, I increased hard ulimit to large number as 85k but
no luck.
Seems like there is file descriptor leak.

This spark job is running with native support of kubernetes as spark
cluster manager. Currently using only two executor with 20 core(request)
and 10GB (+6GB as memoryOverhead) of physical memory each.

Have any one else seen the similar problem ?
Thanks for any suggestion.


Error details:
Caused by: java.io.FileNotFoundException:
/tmp/blockmgr-b530983c-39be-4c6d-95aa-3ad12a507843/24/temp_shuffle_bf774cf5-fadb-4ca7-a27b-a5be7b835eb6
(Too many open files in system)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.(FileOutputStream.java:213)
at
org.apache.spark.storage.DiskBlockObjectWriter.initialize(DiskBlockObjectWriter.scala:103)
at
org.apache.spark.storage.DiskBlockObjectWriter.open(DiskBlockObjectWriter.scala:116)
at
org.apache.spark.storage.DiskBlockObjectWriter.write(DiskBlockObjectWriter.scala:237)
at
org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)

For more error log, please follow below Github gist:

https://gist.github.com/abhisheyke/1ecd952f2ae6af20cf737308a156f566


Some details about file descriptor (lsof):
https://gist.github.com/abhisheyke/27b073e7ac805ce9e6bb33c2b011bb5a

Code Snip:
https://gist.github.com/abhisheyke/6f838adf6651491bd4f263956f403c74

Platform  Details:
Kubernets Version : 1.9.2
Docker : 17.3.2
Spark version:  2.3.1
Kafka version: 2.11-0.10.2.1 (both topic has 20 partition and getting
almost 5k records/s )
Hadoop version (Using hdfs for check pointing)  : 2.7.2

Thank you for any help.

Best Regards,
*Abhishek Tripathi*


Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

2018-07-19 Thread Gerard Maas
Hi Chris,

Could you show the code you are using? When you mention "I like to use a
static datasource (JDBC) in the state function" do you refer to a DataFrame
from a JDBC source or an independent JDBC connection?

The key point to consider is that the flatMapGroupsWithState function must
be serializable. Its execution happens in the workers of a Spark job.

If you are using a JDBC connection, you need to make sure the connection is
made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are
defined. They are bound to the Spark Session in the driver and it does not
make sense to access them in a remote executor.

Make sure you check the executor logs as well. There might be a
NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev (d...@spark.apache.org) is for discussions about open source
development of the Spark project.
For general questions like this, use the user's  mailing list (
user@spark.apache.org)  (note that I changed that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <
christiaan@semmelwise.nl> wrote:

> I use the state function flatmapgroupswithstate to track state of a kafka
> stream. To further customize the state function I like to use a static
> datasource (JDBC) in the state function. This datasource contains data I
> like to join with the stream (as Iterator) within flatmapgroupswithstate.
>
>
>
> When I try to access the JDBC source within flatmapgroupswithstate Spark
> execution freezes without any Exceptions or logging.
>
> To verify the JDBC connection works, I also tried to access the source
> outside the state function and that works. So now I join the static source
> with streaming source before feeding it to flatmapgroupswithstate. It seems
> to work so far…
>
>
>
> Any ideas why accessing the JDBC source within flatmapgroupswithstate
> could fail (freezes Spark execution)? Is it wise to use external
> datasources within flatmapgroupswithstate?
>
>
>
> Thanks,
>
> Chris
>
>
>
>
>