Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
sorry i thought i gave an explanation

The issue you are encountering with incorrect record numbers in the
"ShuffleWrite Size/Records" column in the Spark DAG UI when data is read
from cache/persist is a known limitation. This discrepancy arises due to
the way Spark handles and reports shuffle data when caching is involved.

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge
but of course cannot be guaranteed . It is essential to note that, as with
any advice, quote "one test result is worth one-thousand expert opinions
(Werner Von Braun)".


Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Sun, 26 May 2024 at 21:16, Prem Sahoo  wrote:

> Can anyone please assist me ?
>
> On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:
>
>> Does anyone have a clue ?
>>
>> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>>
>>> Hello Team,
>>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>>> can view the tasks.
>>>
>>> In each task we have a column "ShuffleWrite Size/Records " that column
>>> prints wrong data when it gets the data from cache/persist . it
>>> typically will show the wrong record number though the data size is correct
>>> for e.g  3.2G/ 7400 which is wrong .
>>>
>>> please advise.
>>>
>>


Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Just to further clarify that the Shuffle Write Size/Records column in
the Spark UI can be misleading when working with cached/persisted data
because it reflects the shuffled data size and record count, not the
entire cached/persisted data., So it is fair to say that this is a
limitation of the UI's display, not necessarily a bug in the Spark
framework itself.

HTH

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".

Mich Talebzadeh,

Technologist | Architect | Data Engineer  | Generative AI | FinCrime

London
United Kingdom


   view my Linkedin profile


 https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner Von Braun)".



On Sun, 26 May 2024 at 16:45, Mich Talebzadeh  wrote:
>
> Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show 
> incorrect record counts when data is retrieved from cache or persisted data. 
> This happens because the record count reflects the number of records written 
> to disk for shuffling, and not the actual number of records in the cached or 
> persisted data itself. Add to it, because of lazy evaluation:, Spark may only 
> materialize a portion of the cached or persisted data when a task needs it. 
> The "Shuffle Write Size/Records" might only reflect the materialized portion, 
> not the total number of records in the cache/persistence. While the "Shuffle 
> Write Size/Records" might be inaccurate for cached/persisted data, the 
> "Shuffle Read Size/Records" column can be more reliable. This metric shows 
> the number of records read from shuffle by the following stage, which should 
> be closer to the actual number of records processed.
>
> HTH
>
> Mich Talebzadeh,
>
> Technologist | Architect | Data Engineer  | Generative AI | FinCrime
>
> London
> United Kingdom
>
>
>view my Linkedin profile
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> Disclaimer: The information provided is correct to the best of my knowledge 
> but of course cannot be guaranteed . It is essential to note that, as with 
> any advice, quote "one test result is worth one-thousand expert opinions 
> (Werner Von Braun)".
>
>
>
> On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:
>>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you can 
>> view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column 
>> prints wrong data when it gets the data from cache/persist . it typically 
>> will show the wrong record number though the data size is correct for e.g  
>> 3.2G/ 7400 which is wrong .
>>
>> please advise.

-
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org



Re: BUG :: UI Spark

2024-05-26 Thread Mich Talebzadeh
Yep, the Spark UI's Shuffle Write Size/Records" column can sometimes show
incorrect record counts *when data is retrieved from cache or persisted
data*. This happens because the record count reflects the number of records
written to disk for shuffling, and not the actual number of records in the
cached or persisted data itself. Add to it, because of lazy evaluation:,
Spark may only materialize a portion of the cached or persisted data when a
task needs it. The "Shuffle Write Size/Records" might only reflect the
materialized portion, not the total number of records in the
cache/persistence. While the "Shuffle Write Size/Records" might be
inaccurate for cached/persisted data, the "Shuffle Read Size/Records"
column can be more reliable. This metric shows the number of records read
from shuffle by the following stage, which should be closer to the actual
number of records processed.

HTH

Mich Talebzadeh,
Technologist | Architect | Data Engineer  | Generative AI | FinCrime
London
United Kingdom


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* The information provided is correct to the best of my
knowledge but of course cannot be guaranteed . It is essential to note
that, as with any advice, quote "one test result is worth one-thousand
expert opinions (Werner  Von
Braun )".


On Thu, 23 May 2024 at 17:45, Prem Sahoo  wrote:

> Hello Team,
> in spark DAG UI , we have Stages tab. Once you click on each stage you can
> view the tasks.
>
> In each task we have a column "ShuffleWrite Size/Records " that column
> prints wrong data when it gets the data from cache/persist . it
> typically will show the wrong record number though the data size is correct
> for e.g  3.2G/ 7400 which is wrong .
>
> please advise.
>


Re: BUG :: UI Spark

2024-05-26 Thread Prem Sahoo
Can anyone please assist me ?

On Fri, May 24, 2024 at 12:29 AM Prem Sahoo  wrote:

> Does anyone have a clue ?
>
> On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:
>
>> Hello Team,
>> in spark DAG UI , we have Stages tab. Once you click on each stage you
>> can view the tasks.
>>
>> In each task we have a column "ShuffleWrite Size/Records " that column
>> prints wrong data when it gets the data from cache/persist . it
>> typically will show the wrong record number though the data size is correct
>> for e.g  3.2G/ 7400 which is wrong .
>>
>> please advise.
>>
>


Re: BUG :: UI Spark

2024-05-23 Thread Prem Sahoo
Does anyone have a clue ?

On Thu, May 23, 2024 at 11:40 AM Prem Sahoo  wrote:

> Hello Team,
> in spark DAG UI , we have Stages tab. Once you click on each stage you can
> view the tasks.
>
> In each task we have a column "ShuffleWrite Size/Records " that column
> prints wrong data when it gets the data from cache/persist . it
> typically will show the wrong record number though the data size is correct
> for e.g  3.2G/ 7400 which is wrong .
>
> please advise.
>


Re: Bug?

2021-02-18 Thread Tyson
I am not sure if the problem persists in 3.x.

On Thu, Feb 18, 2021 at 12:14 PM Dongjoon Hyun 
wrote:

> Thank you for sharing, Tyson.
>
> Spark 2.4.4 looks too old to me. Do you think it will occur at 3.x?
>
> Bests,
> Dongjoon.
>
>
> On Thu, Feb 18, 2021 at 11:07 AM Tyson  wrote:
>
>> We observed an interesting stack trace that I'd like to share with you.
>> The logging level is WARN, but it appears to be causing task failures.
>> Please let me know if anyone has any insights. It appears to be a integer
>> overflow issue from looking at the code in Spark 2.4.4
>>
>> WARN TaskSetManager [task-result-getter-0]: Lost task 3175.0 in stage
>> 518.0 (TID 186951, executor 150): java.lang.NegativeArraySizeException
>>
>>  at 
>> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:438)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:414)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:218)
>>  at 
>> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:216)
>>  at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>>  at 
>> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown
>>  Source)
>>  at 
>> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>>  at 
>> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>>  at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>>  at 
>> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>>  at 
>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>>  at org.apache.spark.scheduler.Task.run(Task.scala:123)
>>  at 
>> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>>  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>>  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  at java.lang.Thread.run(Thread.java:748)
>>
>>


Re: Bug?

2021-02-18 Thread Dongjoon Hyun
Thank you for sharing, Tyson.

Spark 2.4.4 looks too old to me. Do you think it will occur at 3.x?

Bests,
Dongjoon.


On Thu, Feb 18, 2021 at 11:07 AM Tyson  wrote:

> We observed an interesting stack trace that I'd like to share with you.
> The logging level is WARN, but it appears to be causing task failures.
> Please let me know if anyone has any insights. It appears to be a integer
> overflow issue from looking at the code in Spark 2.4.4
>
> WARN TaskSetManager [task-result-getter-0]: Lost task 3175.0 in stage
> 518.0 (TID 186951, executor 150): java.lang.NegativeArraySizeException
>
>   at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getBinary(UnsafeRow.java:438)
>   at 
> org.apache.spark.sql.catalyst.expressions.UnsafeRow.getDecimal(UnsafeRow.java:414)
>   at 
> org.apache.spark.sql.catalyst.expressions.JoinedRow.getDecimal(JoinedRow.scala:95)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_4$(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:218)
>   at 
> org.apache.spark.sql.execution.joins.HashJoin$$anonfun$join$1.apply(HashJoin.scala:216)
>   at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
>   at 
> org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage5.processNext(Unknown
>  Source)
>   at 
> org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
>   at 
> org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
>   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>   at 
> org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:187)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
>   at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
>   at org.apache.spark.scheduler.Task.run(Task.scala:123)
>   at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>   at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>   at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
>
>


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Thanks for the tip!

But if the first thing you come across
Is somebody  using the trim function to strip away spaces in /etc/hostnames
like so from :

127.0.0.1 hostname local

To

127.0.0.1hostnamelocal

Then there is a log error message showing the outcome of unnecessarily
using the trim function.

Especially when one of the spark core functionality is to read lines from
files separated by a space, comma.

Also have you seen the log4j.properties
Setting to ERROR and in one case FATAL
for suppressing discrepancies.

Please May I draw your attention and attention of all in the community to
this page Which shows turning on compiler WARNINGS  before releasing
software and other software best practices.

“The Power of 10 — NASA’s Rules for Coding” by Riccardo Giorato
https://link.medium.com/PUz88PIql3

What impression  would you have  ?



On Sat, 28 Mar 2020, 15:50 Jeff Evans, 
wrote:

> Dude, you really need to chill. Have you ever worked with a large open
> source project before? It seems not. Even so, insinuating there are tons of
> bugs that were left uncovered until you came along (despite the fact that
> the project is used by millions across many different organizations) is
> ludicrous. Learn a little bit of humility
>
> If you're new to something, assume you have made a mistake rather than
> that there is a bug. Lurk a bit more, or even do a simple Google search,
> and you will realize Sean is a very senior committer (i.e. expert) in
> Spark, and has been for many years. He, and everyone else participating in
> these lists, is doing it voluntarily on their own time. They're not being
> paid to handhold you and quickly answer to your every whim.
>
> On Sat, Mar 28, 2020, 10:46 AM Zahid Rahman  wrote:
>
>> So the schema is limited to holding only the DEFINITION of schema. For
>> example as you say  the columns, I.e. first column User:Int 2nd column
>> String:password.
>>
>> Not location of source I.e. csv file with or without header.  SQL DB
>> tables.
>>
>> I am pleased for once I am wrong about being another bug, and it was a
>> design decision adding flexibility.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
>> wrote:
>>
>>> This is probably more of a question for the user support list, but I
>>> believe I understand the issue.
>>>
>>> Schema inside of spark refers to the structure of the output rows, for
>>> example the schema for a particular dataframe could be
>>> (User: Int, Password: String) - Two Columns the first is User of type
>>> int and the second is Password of Type String.
>>>
>>> When you pass the schema from one reader to another, you are only
>>> copyting this structure, not all of the other options associated with the
>>> dataframe.
>>> This is usually useful when you are reading from sources with different
>>> options but data that needs to be read into the same structure.
>>>
>>> The other properties such as "format" and "options" exist independently
>>> of Schema. This is helpful if I was reading from both MySQL and
>>> a comma separated file for example. While the Schema is the same, the
>>> options like ("inferSchema") do not apply to both MySql and CSV and
>>> format actually picks whether to us "JDBC" or "CSV" so copying that
>>> wouldn't be helpful either.
>>>
>>> I hope this clears things up,
>>> Russ
>>>
>>> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman 
>>> wrote:
>>>
 Hi,
 version: spark-3.0.0-preview2-bin-hadoop2.7

 As you can see from the code :

 STEP 1:  I  create a object of type static frame which holds all the
 information to the datasource (csv files).

 STEP 2: Then I create a variable  called staticSchema  assigning the
 information of the schema from the original static data frame.

 STEP 3: then I create another variable called val streamingDataFrame of
 type spark.readStream.
 and Into the .schema function parameters I pass the object staticSchema
 which is meant to hold the information to the  csv files including the
 .load(path) function etc.

 So then when I am creating val StreamingDataFrame and passing it
 .schema(staticSchema)
 the variable StreamingDataFrame  should have all the information.
 I should only have to call .option("maxFilePerTrigger",1) and not
 .format ("csv")
 .option("header","true").load("/data/retail-data/by-day/*.csv")
 Otherwise what is the point of passing .schema(staticSchema) to
 StreamingDataFrame.

 You can replicate it using the complete code below.

 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.{window,column,desc,col}

 object RetailData {

   def main(args: Array[String]): Unit = {

 // create spark session
 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
 Data").getOrCreate();
 // set spark runtime  configuration
 

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
So the schema is limited to holding only the DEFINITION of schema. For
example as you say  the columns, I.e. first column User:Int 2nd column
String:password.

Not location of source I.e. csv file with or without header.  SQL DB tables.

I am pleased for once I am wrong about being another bug, and it was a
design decision adding flexibility.









On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // create spark session
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>> // set spark runtime  configuration
>> spark.conf.set("spark.sql.shuffle.partitions","5")
>> 
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>> // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>> .option ("header","true")
>> .option("inferschema","true")
>> .load("/data/retail-data/by-day/*.csv")
>>
>>
>> staticDataFrame.createOrReplaceTempView("retail_data")
>> val staticSchema = staticDataFrame.schema
>>
>> staticDataFrame
>>   .selectExpr(
>> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>   .groupBy(col("CustomerId"),
>> window(col("InvoiceDate"),
>> "1 day"))
>>   .sum("total_cost")
>>   .sort(desc("sum(total_cost)"))
>>   .show(2)
>>
>> val streamingDataFrame = spark.readStream
>>   .schema(staticSchema)
>>   .format("csv")
>>   .option("maxFilesPerTrigger", 1)
>>   .option("header","true")
>>   .load("/data/retail-data/by-day/*.csv")
>>
>>   println(streamingDataFrame.isStreaming)
>>
>> // lazy operation so we will need to call a streaming action to start 
>> the action
>> val purchaseByCustomerPerHour = streamingDataFrame
>> .selectExpr(
>>   "CustomerId",
>>   "(UnitPrice * Quantity) as total_cost",
>>   "InvoiceDate")
>> .groupBy(
>>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> .sum("total_cost")
>>
>> // stream action to write to console
>> purchaseByCustomerPerHour.writeStream
>>   .format("console")
>>   .queryName("customer_purchases")
>>   .outputMode("complete")
>>   .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Very kind of you.

On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // create spark session
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>> // set spark runtime  configuration
>> spark.conf.set("spark.sql.shuffle.partitions","5")
>> 
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>> // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>> .option ("header","true")
>> .option("inferschema","true")
>> .load("/data/retail-data/by-day/*.csv")
>>
>>
>> staticDataFrame.createOrReplaceTempView("retail_data")
>> val staticSchema = staticDataFrame.schema
>>
>> staticDataFrame
>>   .selectExpr(
>> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>   .groupBy(col("CustomerId"),
>> window(col("InvoiceDate"),
>> "1 day"))
>>   .sum("total_cost")
>>   .sort(desc("sum(total_cost)"))
>>   .show(2)
>>
>> val streamingDataFrame = spark.readStream
>>   .schema(staticSchema)
>>   .format("csv")
>>   .option("maxFilesPerTrigger", 1)
>>   .option("header","true")
>>   .load("/data/retail-data/by-day/*.csv")
>>
>>   println(streamingDataFrame.isStreaming)
>>
>> // lazy operation so we will need to call a streaming action to start 
>> the action
>> val purchaseByCustomerPerHour = streamingDataFrame
>> .selectExpr(
>>   "CustomerId",
>>   "(UnitPrice * Quantity) as total_cost",
>>   "InvoiceDate")
>> .groupBy(
>>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> .sum("total_cost")
>>
>> // stream action to write to console
>> purchaseByCustomerPerHour.writeStream
>>   .format("console")
>>   .queryName("customer_purchases")
>>   .outputMode("complete")
>>   .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Russell Spitzer
This is probably more of a question for the user support list, but I
believe I understand the issue.

Schema inside of spark refers to the structure of the output rows, for
example the schema for a particular dataframe could be
(User: Int, Password: String) - Two Columns the first is User of type int
and the second is Password of Type String.

When you pass the schema from one reader to another, you are only
copyting this structure, not all of the other options associated with the
dataframe.
This is usually useful when you are reading from sources with different
options but data that needs to be read into the same structure.

The other properties such as "format" and "options" exist independently of
Schema. This is helpful if I was reading from both MySQL and
a comma separated file for example. While the Schema is the same, the
options like ("inferSchema") do not apply to both MySql and CSV and
format actually picks whether to us "JDBC" or "CSV" so copying that
wouldn't be helpful either.

I hope this clears things up,
Russ

On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:

> Hi,
> version: spark-3.0.0-preview2-bin-hadoop2.7
>
> As you can see from the code :
>
> STEP 1:  I  create a object of type static frame which holds all the
> information to the datasource (csv files).
>
> STEP 2: Then I create a variable  called staticSchema  assigning the
> information of the schema from the original static data frame.
>
> STEP 3: then I create another variable called val streamingDataFrame of
> type spark.readStream.
> and Into the .schema function parameters I pass the object staticSchema
> which is meant to hold the information to the  csv files including the
> .load(path) function etc.
>
> So then when I am creating val StreamingDataFrame and passing it
> .schema(staticSchema)
> the variable StreamingDataFrame  should have all the information.
> I should only have to call .option("maxFilePerTrigger",1) and not .format
> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
> Otherwise what is the point of passing .schema(staticSchema) to
> StreamingDataFrame.
>
> You can replicate it using the complete code below.
>
> import org.apache.spark.sql.SparkSession
> import org.apache.spark.sql.functions.{window,column,desc,col}
>
> object RetailData {
>
>   def main(args: Array[String]): Unit = {
>
> // create spark session
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
> Data").getOrCreate();
> // set spark runtime  configuration
> spark.conf.set("spark.sql.shuffle.partitions","5")
> 
> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>
> // create a static frame
>   val staticDataFrame = spark.read.format("csv")
> .option ("header","true")
> .option("inferschema","true")
> .load("/data/retail-data/by-day/*.csv")
>
>
> staticDataFrame.createOrReplaceTempView("retail_data")
> val staticSchema = staticDataFrame.schema
>
> staticDataFrame
>   .selectExpr(
> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>   .groupBy(col("CustomerId"),
> window(col("InvoiceDate"),
> "1 day"))
>   .sum("total_cost")
>   .sort(desc("sum(total_cost)"))
>   .show(2)
>
> val streamingDataFrame = spark.readStream
>   .schema(staticSchema)
>   .format("csv")
>   .option("maxFilesPerTrigger", 1)
>   .option("header","true")
>   .load("/data/retail-data/by-day/*.csv")
>
>   println(streamingDataFrame.isStreaming)
>
> // lazy operation so we will need to call a streaming action to start the 
> action
> val purchaseByCustomerPerHour = streamingDataFrame
> .selectExpr(
>   "CustomerId",
>   "(UnitPrice * Quantity) as total_cost",
>   "InvoiceDate")
> .groupBy(
>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
> .sum("total_cost")
>
> // stream action to write to console
> purchaseByCustomerPerHour.writeStream
>   .format("console")
>   .queryName("customer_purchases")
>   .outputMode("complete")
>   .start()
>
>   } // main
>
> } // object
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> val staticSchema = staticDataFrame.schema
>
>
>
>
>
>
>
>
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-slave.sh spark://
192.168.0.38:7077
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-master.sh

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 06:12, Zahid Rahman  wrote:

> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:
>
>> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
>> just include Spark dependency in IntelliJ?
>>
>> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman 
>> wrote:
>>
>>> I have configured  in IntelliJ as external jars
>>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>>
>>> not pulling anything from maven.
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>>
>>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>>
 Which Spark/Scala version do you use?

 On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
 wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark 
> Session take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
> instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert
> an extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing 
> the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter 
> and map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME 
> != "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>



Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:

> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
> just include Spark dependency in IntelliJ?
>
> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:
>
>> I have configured  in IntelliJ as external jars
>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>
>> not pulling anything from maven.
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>>
>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>
>>> Which Spark/Scala version do you use?
>>>
>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>> wrote:
>>>

 with the following sparksession configuration

 val spark = SparkSession.builder().master("local[*]").appName("Spark 
 Session take").getOrCreate();

 this line works

 flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)


 however if change the master url like so, with the ip address then the
 following error is produced by the position of .take(5)

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();


 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
 instance of java.lang.invoke.SerializedLambda to field
 org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
 of org.apache.spark.rdd.MapPartitionsRDD

 BUT if I  remove take(5) or change the position of take(5) or insert an
 extra take(5) as illustrated in code then it works. I don't see why the
 position of take(5) should cause such an error or be caused by changing the
 master url

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
 + 5))
flights.show(5)


 complete code if you wish to replicate it.

 import org.apache.spark.sql.SparkSession

 object sessiontest {

   // define specific  data type class then manipulate it using the filter 
 and map functions
   // this is also known as an Encoder
   case class flight (DEST_COUNTRY_NAME: String,
  ORIGIN_COUNTRY_NAME:String,
  count: BigInt)


   def main(args:Array[String]): Unit ={

 val spark = 
 SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
 Session take").getOrCreate();

 import spark.implicits._
 val flightDf = 
 spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
 val flights = flightDf.as[flight]

 flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
 "Canada").map(flight_row => flight_row).take(5)

   flights.take(5)

   flights
   .take(5)
   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
 fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
flights.show(5)

   } // main
 }





 Backbutton.co.uk
 ¯\_(ツ)_/¯
 ♡۶Java♡۶RMI ♡۶
 Make Use Method {MUM}
 makeuse.org
 

>>>


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Wenchen Fan
Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
just include Spark dependency in IntelliJ?

On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:

> I have configured  in IntelliJ as external jars
> spark-3.0.0-preview2-bin-hadoop2.7/jar
>
> not pulling anything from maven.
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>
>
> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>
>> Which Spark/Scala version do you use?
>>
>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>> wrote:
>>
>>>
>>> with the following sparksession configuration
>>>
>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> this line works
>>>
>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>
>>> however if change the master url like so, with the ip address then the
>>> following error is produced by the position of .take(5)
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>>
>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>> instance of java.lang.invoke.SerializedLambda to field
>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>
>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>> extra take(5) as illustrated in code then it works. I don't see why the
>>> position of take(5) should cause such an error or be caused by changing the
>>> master url
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>>> 5))
>>>flights.show(5)
>>>
>>>
>>> complete code if you wish to replicate it.
>>>
>>> import org.apache.spark.sql.SparkSession
>>>
>>> object sessiontest {
>>>
>>>   // define specific  data type class then manipulate it using the filter 
>>> and map functions
>>>   // this is also known as an Encoder
>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>  ORIGIN_COUNTRY_NAME:String,
>>>  count: BigInt)
>>>
>>>
>>>   def main(args:Array[String]): Unit ={
>>>
>>> val spark = 
>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>> Session take").getOrCreate();
>>>
>>> import spark.implicits._
>>> val flightDf = 
>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>> val flights = flightDf.as[flight]
>>>
>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>> "Canada").map(flight_row => flight_row).take(5)
>>>
>>>   flights.take(5)
>>>
>>>   flights
>>>   .take(5)
>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>flights.show(5)
>>>
>>>   } // main
>>> }
>>>
>>>
>>>
>>>
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> 
>>>
>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
I have configured  in IntelliJ as external jars
spark-3.0.0-preview2-bin-hadoop2.7/jar

not pulling anything from maven.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:

> Which Spark/Scala version do you use?
>
> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:
>
>>
>> with the following sparksession configuration
>>
>> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
>> take").getOrCreate();
>>
>> this line works
>>
>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>
>> however if change the master url like so, with the ip address then the
>> following error is produced by the position of .take(5)
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>>
>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
>> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>> instance of java.lang.invoke.SerializedLambda to field
>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>> of org.apache.spark.rdd.MapPartitionsRDD
>>
>> BUT if I  remove take(5) or change the position of take(5) or insert an
>> extra take(5) as illustrated in code then it works. I don't see why the
>> position of take(5) should cause such an error or be caused by changing the
>> master url
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>> 5))
>>flights.show(5)
>>
>>
>> complete code if you wish to replicate it.
>>
>> import org.apache.spark.sql.SparkSession
>>
>> object sessiontest {
>>
>>   // define specific  data type class then manipulate it using the filter 
>> and map functions
>>   // this is also known as an Encoder
>>   case class flight (DEST_COUNTRY_NAME: String,
>>  ORIGIN_COUNTRY_NAME:String,
>>  count: BigInt)
>>
>>
>>   def main(args:Array[String]): Unit ={
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>> import spark.implicits._
>> val flightDf = 
>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>> val flights = flightDf.as[flight]
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>flights.show(5)
>>
>>   } // main
>> }
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> 
>>
>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Wenchen Fan
Which Spark/Scala version do you use?

On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:

>
> with the following sparksession configuration
>
> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
> take").getOrCreate();
>
> this line works
>
> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>
> however if change the master url like so, with the ip address then the
> following error is produced by the position of .take(5)
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
>
> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
> instance of java.lang.invoke.SerializedLambda to field
> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
> of org.apache.spark.rdd.MapPartitionsRDD
>
> BUT if I  remove take(5) or change the position of take(5) or insert an
> extra take(5) as illustrated in code then it works. I don't see why the
> position of take(5) should cause such an error or be caused by changing the
> master url
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
> 5))
>flights.show(5)
>
>
> complete code if you wish to replicate it.
>
> import org.apache.spark.sql.SparkSession
>
> object sessiontest {
>
>   // define specific  data type class then manipulate it using the filter and 
> map functions
>   // this is also known as an Encoder
>   case class flight (DEST_COUNTRY_NAME: String,
>  ORIGIN_COUNTRY_NAME:String,
>  count: BigInt)
>
>
>   def main(args:Array[String]): Unit ={
>
> val spark = 
> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
> Session take").getOrCreate();
>
> import spark.implicits._
> val flightDf = 
> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
> val flights = flightDf.as[flight]
>
> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
> "Canada").map(flight_row => flight_row).take(5)
>
>   flights.take(5)
>
>   flights
>   .take(5)
>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
> + 5))
>flights.show(5)
>
>   } // main
> }
>
>
>
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> 
>


Re: bug in Worker.scala, ExecutorRunner is not serializable

2015-09-17 Thread Sean Owen
Did this cause an error for you?

On Thu, Sep 17, 2015, 8:51 AM Huangguowei  wrote:

>
>
> In Worker.scala line 480:
>
>
>
> case RequestWorkerState =>
>
>   sender ! WorkerStateResponse(host, port, workerId,
> executors.values.toList,
>
> finishedExecutors.values.toList, drivers.values.toList,
>
> finishedDrivers.values.toList, activeMasterUrl, cores, memory,
>
> coresUsed, memoryUsed, activeMasterWebUiUrl)
>
>
>
> The executors’s type is:
>
> val executors = new HashMap[String, ExecutorRunner]
>
>
>
> but ExecutorRunner cannot be Serialized, so if ask RequestWorkerState will
> cause java.io.NotSerializableException.
>
>
>
>
>


re: bug in Worker.scala, ExecutorRunner is not serializable

2015-09-17 Thread Huangguowei

Is it possible to get Executors status when running an application?

发件人: Sean Owen [mailto:so...@cloudera.com]
发送时间: 2015年9月17日 15:54
收件人: Huangguowei; Dev
主题: Re: bug in Worker.scala, ExecutorRunner is not serializable


Did this cause an error for you?

On Thu, Sep 17, 2015, 8:51 AM Huangguowei 
<huangguo...@huawei.com<mailto:huangguo...@huawei.com>> wrote:

In Worker.scala line 480:

case RequestWorkerState =>
  sender ! WorkerStateResponse(host, port, workerId, 
executors.values.toList,
finishedExecutors.values.toList, drivers.values.toList,
finishedDrivers.values.toList, activeMasterUrl, cores, memory,
coresUsed, memoryUsed, activeMasterWebUiUrl)

The executors’s type is:
val executors = new HashMap[String, ExecutorRunner]

but ExecutorRunner cannot be Serialized, so if ask RequestWorkerState will 
cause java.io.NotSerializableException.




Re: bug in Worker.scala, ExecutorRunner is not serializable

2015-09-17 Thread Shixiong Zhu
RequestWorkerState is an internal message between Worker and WorkerWebUI.
Since they are in the same process, that's fine. Actually, these are not
public APIs. Could you elaborate your use case?

Best Regards,
Shixiong Zhu

2015-09-17 16:36 GMT+08:00 Huangguowei <huangguo...@huawei.com>:

>
>
> Is it possible to get Executors status when running an application?
>
>
>
> *发件人:* Sean Owen [mailto:so...@cloudera.com]
> *发送时间:* 2015年9月17日 15:54
> *收件人:* Huangguowei; Dev
> *主题:* Re: bug in Worker.scala, ExecutorRunner is not serializable
>
>
>
> Did this cause an error for you?
>
>
>
> On Thu, Sep 17, 2015, 8:51 AM Huangguowei <huangguo...@huawei.com> wrote:
>
>
>
> In Worker.scala line 480:
>
>
>
> case RequestWorkerState =>
>
>   sender ! WorkerStateResponse(host, port, workerId,
> executors.values.toList,
>
> finishedExecutors.values.toList, drivers.values.toList,
>
> finishedDrivers.values.toList, activeMasterUrl, cores, memory,
>
> coresUsed, memoryUsed, activeMasterWebUiUrl)
>
>
>
> The executors’s type is:
>
> val executors = new HashMap[String, ExecutorRunner]
>
>
>
> but ExecutorRunner cannot be Serialized, so if ask RequestWorkerState will
> cause java.io.NotSerializableException.
>
>
>
>
>
>


Re: BUG: 1.3.0 org.apache.spark.sql.Row Does not exist in Java API

2015-04-18 Thread Olivier Girardot
Hi Nipun,
you're right, I created the pull request fixing the documentation:
https://github.com/apache/spark/pull/5569
and the corresponding issue:
https://issues.apache.org/jira/browse/SPARK-6992
Thank you for your time,

Olivier.

Le sam. 18 avr. 2015 à 01:11, Nipun Batra batrani...@gmail.com a écrit :

 Hi Oliver

 Thank you for responding.

 I am able to find org.apache.spark.sql.Row in spark-catalyst_2.10-1.3.0,
 BUT it was not visible in API document yesterday (
 https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/package-frame.html).
 I am pretty sure.

 Also I think this document needs to be changed '
 https://spark.apache.org/docs/latest/sql-programming-guide.html'

 return Row.create(fields[0], fields[1].trim());


 needs to be replaced with RowFactory.create.

 Thanks again for your reponse.

 Thanks
 Nipun Batra



 On Fri, Apr 17, 2015 at 2:50 PM, Olivier Girardot ssab...@gmail.com
 wrote:

 Hi Nipun,
 I'm sorry but I don't understand exactly what your problem is ?
 Regarding the org.apache.spark.sql.Row, it does exists in the Spark SQL
 dependency.
 Is it a compilation problem ?
 Are you trying to run a main method using the pom you've just described ?
 or are you trying to spark-submit the jar ?
 If you're trying to run a main method, the scope provided is not designed
 for that and will make your program fail.

 Regards,

 Olivier.

 Le ven. 17 avr. 2015 à 21:52, Nipun Batra bni...@gmail.com a écrit :

 Hi

 The example given in SQL document
 https://spark.apache.org/docs/latest/sql-programming-guide.html

 org.apache.spark.sql.Row Does not exist in Java API or atleast I was not
 able to find it.

 Build Info - Downloaded from spark website

 Dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.10/artifactId
 version1.3.0/version
 scopeprovided/scope
 /dependency

 Code in documentation

 // Import factory methods provided by DataType.import
 org.apache.spark.sql.types.DataType;// Import StructType and
 StructFieldimport org.apache.spark.sql.types.StructType;import
 org.apache.spark.sql.types.StructField;// Import Row.import
 org.apache.spark.sql.Row;
 // sc is an existing JavaSparkContext.SQLContext sqlContext = new
 org.apache.spark.sql.SQLContext(sc);
 // Load a text file and convert each line to a
 JavaBean.JavaRDDString people =
 sc.textFile(examples/src/main/resources/people.txt);
 // The schema is encoded in a stringString schemaString = name age;
 // Generate the schema based on the string of schemaListStructField
 fields = new ArrayListStructField();for (String fieldName:
 schemaString.split( )) {
   fields.add(DataType.createStructField(fieldName,
 DataType.StringType, true));}StructType schema =
 DataType.createStructType(fields);
 // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD =
 people.map(
   new FunctionString, Row() {
 public Row call(String record) throws Exception {
   String[] fields = record.split(,);
   return Row.create(fields[0], fields[1].trim());
 }
   });
 // Apply the schema to the RDD.DataFrame peopleDataFrame =
 sqlContext.createDataFrame(rowRDD, schema);
 // Register the DataFrame as a
 table.peopleDataFrame.registerTempTable(people);
 // SQL can be run over RDDs that have been registered as
 tables.DataFrame results = sqlContext.sql(SELECT name FROM people);
 // The results of SQL queries are DataFrames and support all the
 normal RDD operations.// The columns of a row in the result can be
 accessed by ordinal.ListString names = results.map(new FunctionRow,
 String() {
   public String call(Row row) {
 return Name:  + row.getString(0);
   }

 }).collect();


 Thanks
 Nipun





Re: BUG: 1.3.0 org.apache.spark.sql.Row Does not exist in Java API

2015-04-17 Thread Olivier Girardot
Hi Nipun,
I'm sorry but I don't understand exactly what your problem is ?
Regarding the org.apache.spark.sql.Row, it does exists in the Spark SQL
dependency.
Is it a compilation problem ?
Are you trying to run a main method using the pom you've just described ?
or are you trying to spark-submit the jar ?
If you're trying to run a main method, the scope provided is not designed
for that and will make your program fail.

Regards,

Olivier.

Le ven. 17 avr. 2015 à 21:52, Nipun Batra bni...@gmail.com a écrit :

 Hi

 The example given in SQL document
 https://spark.apache.org/docs/latest/sql-programming-guide.html

 org.apache.spark.sql.Row Does not exist in Java API or atleast I was not
 able to find it.

 Build Info - Downloaded from spark website

 Dependency
 dependency
 groupIdorg.apache.spark/groupId
 artifactIdspark-sql_2.10/artifactId
 version1.3.0/version
 scopeprovided/scope
 /dependency

 Code in documentation

 // Import factory methods provided by DataType.import
 org.apache.spark.sql.types.DataType;// Import StructType and
 StructFieldimport org.apache.spark.sql.types.StructType;import
 org.apache.spark.sql.types.StructField;// Import Row.import
 org.apache.spark.sql.Row;
 // sc is an existing JavaSparkContext.SQLContext sqlContext = new
 org.apache.spark.sql.SQLContext(sc);
 // Load a text file and convert each line to a
 JavaBean.JavaRDDString people =
 sc.textFile(examples/src/main/resources/people.txt);
 // The schema is encoded in a stringString schemaString = name age;
 // Generate the schema based on the string of schemaListStructField
 fields = new ArrayListStructField();for (String fieldName:
 schemaString.split( )) {
   fields.add(DataType.createStructField(fieldName,
 DataType.StringType, true));}StructType schema =
 DataType.createStructType(fields);
 // Convert records of the RDD (people) to Rows.JavaRDDRow rowRDD =
 people.map(
   new FunctionString, Row() {
 public Row call(String record) throws Exception {
   String[] fields = record.split(,);
   return Row.create(fields[0], fields[1].trim());
 }
   });
 // Apply the schema to the RDD.DataFrame peopleDataFrame =
 sqlContext.createDataFrame(rowRDD, schema);
 // Register the DataFrame as a
 table.peopleDataFrame.registerTempTable(people);
 // SQL can be run over RDDs that have been registered as
 tables.DataFrame results = sqlContext.sql(SELECT name FROM people);
 // The results of SQL queries are DataFrames and support all the
 normal RDD operations.// The columns of a row in the result can be
 accessed by ordinal.ListString names = results.map(new FunctionRow,
 String() {
   public String call(Row row) {
 return Name:  + row.getString(0);
   }

 }).collect();


 Thanks
 Nipun



Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread GlennStrycker
Oh... ha, good point.  Sorry, I'm new to mapreduce programming and forgot
about that... I'll have to adjust my reduce function to output a vector/RDD
as the element to return.  Thanks for reminding me of this!



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6717.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread GlennStrycker
Wait a minute... doesn't a reduce function return 1 element PER key pair? 
For example, word-count mapreduce functions return a {word, count} element
for every unique word.  Is this supposed to be a 1-element RDD object?

The .reduce function for a MappedRDD or FlatMappedRDD both are of the form

def reduce(f: (T, T) = T): T

So presumably if I pass the reduce function a list of values {(X,1), (X,1),
(X,1), (Y,1), (Y,1)} and the function is ( (A,B) = (A._1, A._2+B._2 ) ),
then I should get a final vector of {(X,3), (Y,2)}, correct?


I have the following object:

scala temp3
res128: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.Edge[Int],
Int)] = MappedRDD[107] at map at console:27

and it contains the following:

scala temp3.collect
. . .
res129: Array[(org.apache.spark.graphx.Edge[Int], Int)] =
Array((Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,4,1),1), (Edge(5,4,1),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,4,1),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,5,1),1), (Edge(5,5,1),1), (Edge(1,2,1),1), (Edge(1,3,1),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,5,1),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
(Edge(4,7,1),1), (Edge(5,7,1),1), (Edge(0,0,0),1), (E...

but when I run the following, I only get one element in the final vector:

scala temp3.reduce( (A,B) = (A._1, A._2+B._2 ) )
. . .
res130: (org.apache.spark.graphx.Edge[Int], Int) = (Edge(0,0,0),256)

I should be additionally getting { (Edge(1,2,1),1), (Edge(1,3,1),2),
(Edge(2,3,1),2), (Edge(4,5,1),1), (Edge(5,6,1),2), (Edge(6,7,1),1),
(Edge(4,7,1),1), (Edge(5,7,1),2) }



Am I not mapping something correctly before running reduce?  I've tried both
.map and .flatMap, and put in _.copy() everywhere, e.g.

temp3.flatMap(A = Seq(A)).reduce( (A,B) = (A._1, A._2+B._2 ) )
temp3.map(_.copy()).flatMap(A = Seq(A)).reduce( (A,B) = (A._1, A._2+B._2 )
)
etc.





--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6726.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread Reynold Xin
You are probably looking for reduceByKey in that case.

reduce just reduces everything in the collection into a single element.


On Tue, May 20, 2014 at 12:16 PM, GlennStrycker glenn.stryc...@gmail.comwrote:

 Wait a minute... doesn't a reduce function return 1 element PER key pair?
 For example, word-count mapreduce functions return a {word, count} element
 for every unique word.  Is this supposed to be a 1-element RDD object?

 The .reduce function for a MappedRDD or FlatMappedRDD both are of the form

 def reduce(f: (T, T) = T): T

 So presumably if I pass the reduce function a list of values {(X,1), (X,1),
 (X,1), (Y,1), (Y,1)} and the function is ( (A,B) = (A._1, A._2+B._2 ) ),
 then I should get a final vector of {(X,3), (Y,2)}, correct?


 I have the following object:

 scala temp3
 res128: org.apache.spark.rdd.RDD[(org.apache.spark.graphx.Edge[Int],
 Int)] = MappedRDD[107] at map at console:27

 and it contains the following:

 scala temp3.collect
 . . .
 res129: Array[(org.apache.spark.graphx.Edge[Int], Int)] =
 Array((Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(4,4,1),1), (Edge(5,4,1),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,4,1),1), (Edge(0,0,0),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(4,5,1),1), (Edge(5,5,1),1), (Edge(1,2,1),1), (Edge(1,3,1),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(7,5,1),1), (Edge(0,0,0),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1), (Edge(0,0,0),1),
 (Edge(4,7,1),1), (Edge(5,7,1),1), (Edge(0,0,0),1), (E...

 but when I run the following, I only get one element in the final vector:

 scala temp3.reduce( (A,B) = (A._1, A._2+B._2 ) )
 . . .
 res130: (org.apache.spark.graphx.Edge[Int], Int) = (Edge(0,0,0),256)

 I should be additionally getting { (Edge(1,2,1),1), (Edge(1,3,1),2),
 (Edge(2,3,1),2), (Edge(4,5,1),1), (Edge(5,6,1),2), (Edge(6,7,1),1),
 (Edge(4,7,1),1), (Edge(5,7,1),2) }



 Am I not mapping something correctly before running reduce?  I've tried
 both
 .map and .flatMap, and put in _.copy() everywhere, e.g.

 temp3.flatMap(A = Seq(A)).reduce( (A,B) = (A._1, A._2+B._2 ) )
 temp3.map(_.copy()).flatMap(A = Seq(A)).reduce( (A,B) = (A._1, A._2+B._2
 )
 )
 etc.





 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6726.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.



Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread GlennStrycker
I don't seem to have this function in my Spark installation for this object,
or the classes MappedRDD, FlatMappedRDD, EdgeRDD, VertexRDD, or Graph.

Which class should have the reduceByKey function, and how do I cast my
current RDD as this class?

Perhaps this is still due to my Spark installation being out-of-date?



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6728.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread Mark Hamstra
That's all very old functionality in Spark terms, so it shouldn't have
anything to do with your installation being out-of-date.  There is also no
need to cast as long as the relevant implicit conversions are in scope:
import org.apache.spark.SparkContext._


On Tue, May 20, 2014 at 1:00 PM, GlennStrycker glenn.stryc...@gmail.comwrote:

 I don't seem to have this function in my Spark installation for this
 object,
 or the classes MappedRDD, FlatMappedRDD, EdgeRDD, VertexRDD, or Graph.

 Which class should have the reduceByKey function, and how do I cast my
 current RDD as this class?

 Perhaps this is still due to my Spark installation being out-of-date?



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6728.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.



Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread Sean Owen
http://spark.apache.org/docs/0.9.1/api/core/index.html#org.apache.spark.rdd.PairRDDFunctions

It becomes automagically available when your RDD contains pairs.

On Tue, May 20, 2014 at 9:00 PM, GlennStrycker glenn.stryc...@gmail.com wrote:
 I don't seem to have this function in my Spark installation for this object,
 or the classes MappedRDD, FlatMappedRDD, EdgeRDD, VertexRDD, or Graph.

 Which class should have the reduceByKey function, and how do I cast my
 current RDD as this class?

 Perhaps this is still due to my Spark installation being out-of-date?



 --
 View this message in context: 
 http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6728.html
 Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-20 Thread GlennStrycker
For some reason it does not appear when I hit tab in Spark shell, but when
I put everything together in one line, it DOES WORK!

orig_graph.edges.map(_.copy()).cartesian(orig_graph.edges.map(_.copy())).flatMap(
A = Seq(if (A._1.srcId == A._2.dstId) Edge(A._2.srcId,A._1.dstId,1) else if
(A._1.dstId == A._2.srcId) Edge(A._1.srcId,A._2.dstId,1) else Edge(0,0,0) )
).map(word = (word, 1)).reduceByKey(_ + _).collect

= Array((Edge(5,7,1),4), (Edge(5,6,1),4), (Edge(3,2,1),4), (Edge(5,5,1),3),
(Edge(1,3,1),4), (Edge(2,3,1),4), (Edge(6,5,1),4), (Edge(5,4,1),2),
(Edge(2,1,1),2), (Edge(6,7,1),2), (Edge(2,2,1),2), (Edge(7,5,1),4),
(Edge(3,1,1),4), (Edge(4,5,1),2), (Edge(0,0,0),192), (Edge(3,3,1),3),
(Edge(4,7,1),2), (Edge(1,2,1),2), (Edge(4,4,1),1), (Edge(6,6,1),2),
(Edge(7,4,1),2), (Edge(7,6,1),2), (Edge(7,7,1),2), (Edge(1,1,1),3))




--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6730.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-19 Thread Reynold Xin
This was an optimization that reuses a triplet object in GraphX, and when
you do a collect directly on triplets, the same object is returned.

It has been fixed in Spark 1.0 here:
https://issues.apache.org/jira/browse/SPARK-1188

To work around in older version of Spark, you can add a copy step to it,
e.g.

graph.triplets.map(_.copy()).collect()



On Mon, May 19, 2014 at 1:09 PM, GlennStrycker glenn.stryc...@gmail.comwrote:

 graph.triplets does not work -- it returns incorrect results

 I have a graph with the following edges:

 orig_graph.edges.collect
 =  Array(Edge(1,4,1), Edge(1,5,1), Edge(1,7,1), Edge(2,5,1), Edge(2,6,1),
 Edge(3,5,1), Edge(3,6,1), Edge(3,7,1), Edge(4,1,1), Edge(5,1,1),
 Edge(5,2,1), Edge(5,3,1), Edge(6,2,1), Edge(6,3,1), Edge(7,1,1),
 Edge(7,3,1))

 When I run triplets.collect, I only get the last edge repeated 16 times:

 orig_graph.triplets.collect
 = Array(((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1),
 ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1),
 ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1),
 ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1), ((7,1),(3,1),1))

 I've also tried writing various map steps first before calling the triplet
 function, but I get the same results as above.

 Similarly, the example on the graphx programming guide page
 (http://spark.apache.org/docs/0.9.0/graphx-programming-guide.html) is
 incorrect.

 val facts: RDD[String] =
   graph.triplets.map(triplet =
 triplet.srcAttr._1 +  is the  + triplet.attr +  of  +
 triplet.dstAttr._1)

 does not work, but

 val facts: RDD[String] =
   graph.triplets.map(triplet =
 triplet.srcAttr +  is the  + triplet.attr +  of  + triplet.dstAttr)

 does work, although the results are meaningless.  For my graph example, I
 get the following line repeated 16 times:

 1 is the 1 of 1



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.



Re: BUG: graph.triplets does not return proper values

2014-05-19 Thread GlennStrycker
Thanks, rxin, this worked!

I am having a similar problem with .reduce... do I need to insert .copy()
functions in that statement as well?

This part works:
orig_graph.edges.map(_.copy()).flatMap(edge = Seq(edge) ).map(edge =
(Edge(edge.copy().srcId, edge.copy().dstId, edge.copy().attr), 1)).collect

=Array((Edge(1,4,1),1), (Edge(1,5,1),1), (Edge(1,7,1),1), (Edge(2,5,1),1),
(Edge(2,6,1),1), (Edge(3,5,1),1), (Edge(3,6,1),1), (Edge(3,7,1),1),
(Edge(4,1,1),1), (Edge(5,1,1),1), (Edge(5,2,1),1), (Edge(5,3,1),1),
(Edge(6,2,1),1), (Edge(6,3,1),1), (Edge(7,1,1),1), (Edge(7,3,1),1))

But when I try adding on a reduce statement, I only get one element, not 16:
orig_graph.edges.map(_.copy()).flatMap(edge = Seq(edge) ).map(edge =
(Edge(edge.copy().srcId, edge.copy().dstId, edge.copy().attr), 1)).reduce(
(A,B) = { if (A._1.dstId == B._1.srcId) (Edge(A._1.srcId, B._1.dstId, 2),
1) else if (A._1.srcId == B._1.dstId) (Edge(B._1.srcId, A._1.dstId, 2), 1)
else (Edge(0, 0, 0), 0) } )

=(Edge(0,0,0),0)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6695.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-19 Thread GlennStrycker
I tried adding .copy() everywhere, but still only get one element returned,
not even an RDD object.

orig_graph.edges.map(_.copy()).flatMap(edge = Seq(edge) ).map(edge =
(Edge(edge.copy().srcId, edge.copy().dstId, edge.copy().attr), 1)).reduce(
(A,B) = { if (A._1.copy().dstId == B._1.copy().srcId)
(Edge(A._1.copy().srcId, B._1.copy().dstId, 2), 1) else if
(A._1.copy().srcId == B._1.copy().dstId) (Edge(B._1.copy().srcId,
A._1.copy().dstId, 2), 1) else (Edge(0, 0, 3), 1) } )

= (Edge(0,0,3),1)

I'll try getting a fresh copy of the Spark 1.0 code and see if I can get it
to work.  Thanks for your help!!



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6697.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.


Re: BUG: graph.triplets does not return proper values

2014-05-19 Thread Reynold Xin
reduce always return a single element - maybe you are misunderstanding what
the reduce function in collections does.


On Mon, May 19, 2014 at 3:32 PM, GlennStrycker glenn.stryc...@gmail.comwrote:

 I tried adding .copy() everywhere, but still only get one element returned,
 not even an RDD object.

 orig_graph.edges.map(_.copy()).flatMap(edge = Seq(edge) ).map(edge =
 (Edge(edge.copy().srcId, edge.copy().dstId, edge.copy().attr), 1)).reduce(
 (A,B) = { if (A._1.copy().dstId == B._1.copy().srcId)
 (Edge(A._1.copy().srcId, B._1.copy().dstId, 2), 1) else if
 (A._1.copy().srcId == B._1.copy().dstId) (Edge(B._1.copy().srcId,
 A._1.copy().dstId, 2), 1) else (Edge(0, 0, 3), 1) } )

 = (Edge(0,0,3),1)

 I'll try getting a fresh copy of the Spark 1.0 code and see if I can get it
 to work.  Thanks for your help!!



 --
 View this message in context:
 http://apache-spark-developers-list.1001551.n3.nabble.com/BUG-graph-triplets-does-not-return-proper-values-tp6693p6697.html
 Sent from the Apache Spark Developers List mailing list archive at
 Nabble.com.



Re: Bug is KryoSerializer under Mesos [work-around included]

2014-05-15 Thread Soren Macbeth
Hi Matei,

Yes, I'm 100% positive the jar on the executors is the same version. I am
building everything and deploying myself. Additionally, while debugging the
issue, I forked spark's git repo and added additional logging, which I
could see in the driver and executors. These debugging jars exhibited the
same behaviour.

I agree that the code being called in the executors and the driver *should*
be the same, but I found that not to be the case. I forgot to mention that
this issue only exhibits it's self under mesos; running in local mode or a
standalone cluster (with a single worker, all processes running on my
laptop) I wasn't able to reproduce the issue.

The classes in question should get registered by chill-scala's
AllScalaRegistrar here:

https://github.com/twitter/chill/blob/0.3.6/chill-scala/src/main/scala/com/twitter/chill/ScalaKryoInstantiator.scala#L166

but, for a reason I haven't tracked down, they don't in my mesos executor.
I don't really have a way to test if this is an issue specific only to my
mesos cluster, or if it exhibits in all mesos clusters.

fwiw, I am running spark-0.9.1 with hadoop 2.0.0-mr1-cdh4.6.0 under mesos
0.18.1


On Mon, May 12, 2014 at 12:02 PM, Matei Zaharia matei.zaha...@gmail.comwrote:

 Hey Soren, are you sure that the JAR you used on the executors is for the
 right version of Spark? Maybe they’re running an older version. The Kryo
 serializer should be initialized the same way on both.

 Matei

 On May 12, 2014, at 10:39 AM, Soren Macbeth so...@yieldbot.com wrote:

  I finally managed to track down the source of the kryo issues that I was
  having under mesos.
 
  What happens is the for a reason that I haven't tracked down yet, a
 handful
  of the scala collection classes from chill-scala down get registered by
 the
  mesos executors, but they do all get registered in the driver process.
 
  This led to scala.Some classes which were serialized by the executors
 being
  incorrectly deserialized as scala.collections.Wrappers$SeqWrapper in
 driver
  during task deserialization, causing a KryoException.
 
  I resolved this issue in my spark job by explicitly registering the
 classes
  in my Registrator like so:
 
 
  kryo.register(scala.collection.convert.Wrappers.IteratorWrapper.class);
   kryo.register(scala.collection.convert.Wrappers.SeqWrapper.class);
   kryo.register(scala.collection.convert.Wrappers.MapWrapper.class);
   kryo.register(scala.collection.convert.Wrappers.JListWrapper.class);
   kryo.register(scala.collection.convert.Wrappers.JMapWrapper.class);
 
  Again, I'm not sure why they don't get registered in the mesos executors,
  but I wanted to report wht I found as well as a workaround in case anyone
  else hit this (extraordinarily frustrating) issue again.
 
  Some interactive debugging note are available in this gist:
 
  https://gist.github.com/sorenmacbeth/28707a7a973f7a1982dc
 
  Cheers,
  Soren




Re: Bug is KryoSerializer under Mesos [work-around included]

2014-05-12 Thread Matei Zaharia
Hey Soren, are you sure that the JAR you used on the executors is for the right 
version of Spark? Maybe they’re running an older version. The Kryo serializer 
should be initialized the same way on both.

Matei

On May 12, 2014, at 10:39 AM, Soren Macbeth so...@yieldbot.com wrote:

 I finally managed to track down the source of the kryo issues that I was
 having under mesos.
 
 What happens is the for a reason that I haven't tracked down yet, a handful
 of the scala collection classes from chill-scala down get registered by the
 mesos executors, but they do all get registered in the driver process.
 
 This led to scala.Some classes which were serialized by the executors being
 incorrectly deserialized as scala.collections.Wrappers$SeqWrapper in driver
 during task deserialization, causing a KryoException.
 
 I resolved this issue in my spark job by explicitly registering the classes
 in my Registrator like so:
 
 
 kryo.register(scala.collection.convert.Wrappers.IteratorWrapper.class);
  kryo.register(scala.collection.convert.Wrappers.SeqWrapper.class);
  kryo.register(scala.collection.convert.Wrappers.MapWrapper.class);
  kryo.register(scala.collection.convert.Wrappers.JListWrapper.class);
  kryo.register(scala.collection.convert.Wrappers.JMapWrapper.class);
 
 Again, I'm not sure why they don't get registered in the mesos executors,
 but I wanted to report wht I found as well as a workaround in case anyone
 else hit this (extraordinarily frustrating) issue again.
 
 Some interactive debugging note are available in this gist:
 
 https://gist.github.com/sorenmacbeth/28707a7a973f7a1982dc
 
 Cheers,
 Soren



Re: bug using kryo as closure serializer

2014-05-04 Thread Reynold Xin
I added the config option to use the non-default serializer. However, at
the time, Kryo fails serializing pretty much any closures so that option
was never really used / recommended.

Since then the Scala ecosystem has developed, and some other projects are
starting to use Kryo to serialize more Scala data structures, so I wouldn't
be surprised if there is a way to work around this now. However, I don't
have enough time to look into it at this point. If you do, please do post
your findings. Thanks.



On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

 apologies for the cross-list posts, but I've gotten zero response in the
 user list and I guess this list is probably more appropriate.

 According to the documentation, using the KryoSerializer for closures is
 supported. However, when I try to set `spark.closure.serializer` to
 `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.

 The first thing that happens it that is throws exceptions over and over
 that it cannot locate my registrator class, which is located in my assembly
 jar like so:

 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
 spark.kryo.registrator
 java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
 at

 org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116)
 at

 org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Now, I would expect it not to be able to find this class since it hasn't
 yet fetched my assembly jar to the executors. Once it does fetch my jar,
 those expections stop. Next, all the executor task die with the following
 exception:

 java.nio.ReadOnlyBufferException
 at java.nio.ByteBuffer.array(ByteBuffer.java:961)
 at

 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 AFAIK, I'm not doing anything out of the ordinary, just turning on kryo and
 using the registrator mechanism to register a couple custom serializers.

 The reason I tried turning on kryo for closure in the first place is
 because of a different bug that I was hitting during fetching and
 deserializing of tasks from my executors, which I detailed here:


 http://apache-spark-user-list.1001560.n3.nabble.com/Crazy-Kryo-Exception-td5257.html

 Here's hoping some on this list can help me track down what's happening as
 I didn't get a single reply on the user list.



Re: bug using kryo as closure serializer

2014-05-04 Thread Mridul Muralidharan
On a slightly related note (apologies Soren for hijacking the thread),
Reynold how much better is kryo from spark's usage point of view
compared to the default java serialization (in general, not for
closures) ?
The numbers on kyro site are interesting, but since you have played
the most with kryo in context of spark (i think) - how do you rate it
along lines of :

1) computational overhead compared to java serialization.
2) memory overhead.
3) generated byte[] size.


Particularly given the bugs Patrick and I had looked into in past
along flush, etc I was always skeptical about using kyro.
But given the pretty nasty issues with OOM's via java serialization we
are seeing, wanted to know your thoughts on use of kyro with spark.
(Will be slightly involved to ensure everything gets registered, but I
want to go down the path assuming I hear good things in context of
spark)

Thanks,
Mridul


On Mon, May 5, 2014 at 1:20 AM, Reynold Xin r...@databricks.com wrote:
 I added the config option to use the non-default serializer. However, at
 the time, Kryo fails serializing pretty much any closures so that option
 was never really used / recommended.

 Since then the Scala ecosystem has developed, and some other projects are
 starting to use Kryo to serialize more Scala data structures, so I wouldn't
 be surprised if there is a way to work around this now. However, I don't
 have enough time to look into it at this point. If you do, please do post
 your findings. Thanks.



 On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

 apologies for the cross-list posts, but I've gotten zero response in the
 user list and I guess this list is probably more appropriate.

 According to the documentation, using the KryoSerializer for closures is
 supported. However, when I try to set `spark.closure.serializer` to
 `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.

 The first thing that happens it that is throws exceptions over and over
 that it cannot locate my registrator class, which is located in my assembly
 jar like so:

 14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
 spark.kryo.registrator
 java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
 at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
 at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
 at java.security.AccessController.doPrivileged(Native Method)
 at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
 at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
 at java.lang.Class.forName0(Native Method)
 at java.lang.Class.forName(Class.java:270)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:63)
 at

 org.apache.spark.serializer.KryoSerializer$$anonfun$newKryo$2.apply(KryoSerializer.scala:61)
 at scala.Option.foreach(Option.scala:236)
 at
 org.apache.spark.serializer.KryoSerializer.newKryo(KryoSerializer.scala:61)
 at

 org.apache.spark.serializer.KryoSerializerInstance.init(KryoSerializer.scala:116)
 at

 org.apache.spark.serializer.KryoSerializer.newInstance(KryoSerializer.scala:79)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:180)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:41)
 at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:176)
 at

 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at

 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:724)

 Now, I would expect it not to be able to find this class since it hasn't
 yet fetched my assembly jar to the executors. Once it does fetch my jar,
 those expections stop. Next, all the executor task die with the following
 exception:

 java.nio.ReadOnlyBufferException
 at java.nio.ByteBuffer.array(ByteBuffer.java:961)
 at

 org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:136)
 at

 org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:193)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:42)
 at

 org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:41)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:415)
 at

 org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1438)
 at
 

Re: bug using kryo as closure serializer

2014-05-04 Thread Soren Macbeth
Thanks for the reply!

Ok, if that's the case, I'd recommend a note to that affect in the docs at
least.

Just to give some more context here, I'm working on a Clojure DSL for Spark
called Flambo, which I plan to open source shortly. If I could I'd like to
focus on the initial bug that I hit.

Exception in thread main org.apache.spark.SparkException: Job aborted:
Exception while deserializing and fetching task:
com.esotericsoftware.kryo.KryoException:
java.lang.IllegalArgumentException: Can not set final
scala.collection.convert.Wrappers field
scala.collection.convert.Wrappers$SeqWrapper.$outer to
clojure.lang.PersistentVector
Serialization trace:
$outer (scala.collection.convert.Wrappers$SeqWrapper)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

This happens immediately after all the tasks of a reduce stage complete
successfully. Here is the function throwing the exception:

https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43

This is where I get lost. From googling around, it seems that scala is
trying to wrap the result of my task, which contain
clojure.lang.PersistentVector objects in a scala collection, but I don't
know why it's doing that. I have a registered kryo serializer for
clojure.lang.PersistentVector.

based on this line is looks like it's trying to use the closure serializer,
yet the expection thrown is from com.esotericsoftware.kryo.KryoException:

https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39

Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from
trying to deal with my clojure.lang.PeristentVector class?

Where do I go from here?


On Sun, May 4, 2014 at 12:50 PM, Reynold Xin r...@databricks.com wrote:

 I added the config option to use the non-default serializer. However, at
 the time, Kryo fails serializing pretty much any closures so that option
 was never really used / recommended.

 Since then the Scala ecosystem has developed, and some other projects are
 starting to use Kryo to serialize more Scala data structures, so I wouldn't
 be surprised if there is a way to work around this now. However, I don't
 have enough time to look into it at this point. If you do, please do post
 your findings. Thanks.



 On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com wrote:

  apologies for the cross-list posts, but I've gotten zero response in the
  user list and I guess this list is probably more appropriate.
 
  According to the documentation, using the KryoSerializer for closures is
  supported. However, when I try to set `spark.closure.serializer` to
  `org.apache.spark.serializer.KryoSerializer` thing fail pretty miserably.
 
  The first thing that happens it that is throws exceptions over and over
  that it cannot locate my registrator class, which is located in my
 assembly
  jar like so:
 
  14/05/04 12:03:20 ERROR serializer.KryoSerializer: Failed to run
  spark.kryo.registrator
  java.lang.ClassNotFoundException: pickles.kryo.PicklesRegistrator
  at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
  at 

Re: bug using kryo as closure serializer

2014-05-04 Thread Reynold Xin
Good idea. I submitted a pull request for the doc update here:
https://github.com/apache/spark/pull/642


On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth so...@yieldbot.com wrote:

 Thanks for the reply!

 Ok, if that's the case, I'd recommend a note to that affect in the docs at
 least.

 Just to give some more context here, I'm working on a Clojure DSL for Spark
 called Flambo, which I plan to open source shortly. If I could I'd like to
 focus on the initial bug that I hit.

 Exception in thread main org.apache.spark.SparkException: Job aborted:
 Exception while deserializing and fetching task:
 com.esotericsoftware.kryo.KryoException:
 java.lang.IllegalArgumentException: Can not set final
 scala.collection.convert.Wrappers field
 scala.collection.convert.Wrappers$SeqWrapper.$outer to
 clojure.lang.PersistentVector
 Serialization trace:
 $outer (scala.collection.convert.Wrappers$SeqWrapper)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
 at

 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
 at
 scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 at org.apache.spark.scheduler.DAGScheduler.org
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
 at scala.Option.foreach(Option.scala:236)
 at

 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
 at

 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
 at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
 at akka.actor.ActorCell.invoke(ActorCell.scala:456)
 at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
 at akka.dispatch.Mailbox.run(Mailbox.scala:219)
 at

 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at

 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at

 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

 This happens immediately after all the tasks of a reduce stage complete
 successfully. Here is the function throwing the exception:


 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43

 This is where I get lost. From googling around, it seems that scala is
 trying to wrap the result of my task, which contain
 clojure.lang.PersistentVector objects in a scala collection, but I don't
 know why it's doing that. I have a registered kryo serializer for
 clojure.lang.PersistentVector.

 based on this line is looks like it's trying to use the closure serializer,
 yet the expection thrown is from com.esotericsoftware.kryo.KryoException:


 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39

 Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer from
 trying to deal with my clojure.lang.PeristentVector class?

 Where do I go from here?


 On Sun, May 4, 2014 at 12:50 PM, Reynold Xin r...@databricks.com wrote:

  I added the config option to use the non-default serializer. However, at
  the time, Kryo fails serializing pretty much any closures so that option
  was never really used / recommended.
 
  Since then the Scala ecosystem has developed, and some other projects are
  starting to use Kryo to serialize more Scala data structures, so I
 wouldn't
  be surprised if there is a way to work around this now. However, I don't
  have enough time to look into it at this point. If you do, please do post
  your findings. Thanks.
 
 
 
  On Sun, May 4, 2014 at 10:35 AM, Soren Macbeth so...@yieldbot.com
 wrote:
 
   apologies for the cross-list posts, but I've gotten zero response in
 the
   user list and I guess this list is probably more appropriate.
  
   According to the documentation, using the KryoSerializer for closures
 is
   supported. However, when I try to set `spark.closure.serializer` to
   `org.apache.spark.serializer.KryoSerializer` thing fail pretty
 miserably.
  
   The first thing that happens it that is throws exceptions over and over
   that it cannot locate my registrator class, which is located in my
  assembly
   

Re: bug using kryo as closure serializer

2014-05-04 Thread Reynold Xin
Thanks. Do you mind playing with chill-scala a little bit and see if it
actually works well for closures? One way to try is to hard code the
serializer to use Kryo with chill-scala, and then run through all the unit
tests.

If it works well, we can incorporate that in the next release (probably not
1.0, but after that).


On Sun, May 4, 2014 at 9:08 PM, Soren Macbeth so...@yieldbot.com wrote:

 fwiw, it seems like it wouldn't be very difficult to integrate chill-scala,
 since you're already chill-java and probably get kryo serialization of
 closures and all sorts of other scala stuff for free. All that would be
 needed would be to include the dependency and then update KryoSerializer to
 register the stuff in chill-scala.

 In that case, you could probably safely make kryo the default serializer,
 which I think would be desirable in general.


 On Sun, May 4, 2014 at 8:48 PM, Reynold Xin r...@databricks.com wrote:

  Good idea. I submitted a pull request for the doc update here:
  https://github.com/apache/spark/pull/642
 
 
  On Sun, May 4, 2014 at 3:54 PM, Soren Macbeth so...@yieldbot.com
 wrote:
 
   Thanks for the reply!
  
   Ok, if that's the case, I'd recommend a note to that affect in the docs
  at
   least.
  
   Just to give some more context here, I'm working on a Clojure DSL for
  Spark
   called Flambo, which I plan to open source shortly. If I could I'd like
  to
   focus on the initial bug that I hit.
  
   Exception in thread main org.apache.spark.SparkException: Job
 aborted:
   Exception while deserializing and fetching task:
   com.esotericsoftware.kryo.KryoException:
   java.lang.IllegalArgumentException: Can not set final
   scala.collection.convert.Wrappers field
   scala.collection.convert.Wrappers$SeqWrapper.$outer to
   clojure.lang.PersistentVector
   Serialization trace:
   $outer (scala.collection.convert.Wrappers$SeqWrapper)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
   at
  
  
 
 scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
   at
   scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
   at org.apache.spark.scheduler.DAGScheduler.org
  
 $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:604)
   at scala.Option.foreach(Option.scala:236)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:604)
   at
  
  
 
 org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
   at akka.actor.ActorCell.invoke(ActorCell.scala:456)
   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
   at akka.dispatch.Mailbox.run(Mailbox.scala:219)
   at
  
  
 
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
   at
   scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
   at
  
  
 
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
   at
  
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
   at
  
  
 
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
  
   This happens immediately after all the tasks of a reduce stage complete
   successfully. Here is the function throwing the exception:
  
  
  
 
 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L43
  
   This is where I get lost. From googling around, it seems that scala is
   trying to wrap the result of my task, which contain
   clojure.lang.PersistentVector objects in a scala collection, but I
 don't
   know why it's doing that. I have a registered kryo serializer for
   clojure.lang.PersistentVector.
  
   based on this line is looks like it's trying to use the closure
  serializer,
   yet the expection thrown is from
 com.esotericsoftware.kryo.KryoException:
  
  
  
 
 https://github.com/apache/spark/blob/4bc07eebbf5e2ea0c0b6f1642049515025d88d07/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala#L39
  
   Would storing my RDD as MEMORY_ONLY_SER prevent the closure serializer
  from
   trying to deal with my clojure.lang.PeristentVector class?
  
   Where do I go from here?
  
  
   On Sun, May 4, 2014 at 12:50 PM, Reynold Xin