Beautiful Spark Code

2020-03-28 Thread Zahid Rahman
You will be please to learn that Mr. Mathew Powers have seen to my needs
and answers all my questions.

He has seen to all my needs.

Mr Powers  has shut me up !!!

Mr Powers has made Google search stackoverflow and u...@spark.apache.org
redundant.

That is all you guys and girl had to do , point me to his book.

https://leanpub.com/beautiful-spark

https://mungingdata.com/writing-beautiful-apache-spark2-code-with-scala/




On Sat, 28 Mar 2020, 16:49 Zahid Rahman,  wrote:

> Thanks for the tip!
>
> But if the first thing you come across
> Is somebody  using the trim function to strip away spaces in
> /etc/hostnames like so from :
>
> 127.0.0.1 hostname local
>
> To
>
> 127.0.0.1hostnamelocal
>
> Then there is a log error message showing the outcome of unnecessarily
> using the trim function.
>
> Especially when one of the spark core functionality is to read lines from
> files separated by a space, comma.
>
> Also have you seen the log4j.properties
> Setting to ERROR and in one case FATAL
> for suppressing discrepancies.
>
> Please May I draw your attention and attention of all in the community to
> this page Which shows turning on compiler WARNINGS  before releasing
> software and other software best practices.
>
> “The Power of 10 — NASA’s Rules for Coding” by Riccardo Giorato
> https://link.medium.com/PUz88PIql3
>
> What impression  would you have  ?
>
>
>
> On Sat, 28 Mar 2020, 15:50 Jeff Evans, 
> wrote:
>
>> Dude, you really need to chill. Have you ever worked with a large open
>> source project before? It seems not. Even so, insinuating there are tons of
>> bugs that were left uncovered until you came along (despite the fact that
>> the project is used by millions across many different organizations) is
>> ludicrous. Learn a little bit of humility
>>
>> If you're new to something, assume you have made a mistake rather than
>> that there is a bug. Lurk a bit more, or even do a simple Google search,
>> and you will realize Sean is a very senior committer (i.e. expert) in
>> Spark, and has been for many years. He, and everyone else participating in
>> these lists, is doing it voluntarily on their own time. They're not being
>> paid to handhold you and quickly answer to your every whim.
>>
>> On Sat, Mar 28, 2020, 10:46 AM Zahid Rahman  wrote:
>>
>>> So the schema is limited to holding only the DEFINITION of schema. For
>>> example as you say  the columns, I.e. first column User:Int 2nd column
>>> String:password.
>>>
>>> Not location of source I.e. csv file with or without header.  SQL DB
>>> tables.
>>>
>>> I am pleased for once I am wrong about being another bug, and it was a
>>> design decision adding flexibility.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
>>> wrote:
>>>
>>>> This is probably more of a question for the user support list, but I
>>>> believe I understand the issue.
>>>>
>>>> Schema inside of spark refers to the structure of the output rows, for
>>>> example the schema for a particular dataframe could be
>>>> (User: Int, Password: String) - Two Columns the first is User of type
>>>> int and the second is Password of Type String.
>>>>
>>>> When you pass the schema from one reader to another, you are only
>>>> copyting this structure, not all of the other options associated with the
>>>> dataframe.
>>>> This is usually useful when you are reading from sources with different
>>>> options but data that needs to be read into the same structure.
>>>>
>>>> The other properties such as "format" and "options" exist independently
>>>> of Schema. This is helpful if I was reading from both MySQL and
>>>> a comma separated file for example. While the Schema is the same, the
>>>> options like ("inferSchema") do not apply to both MySql and CSV and
>>>> format actually picks whether to us "JDBC" or "CSV" so copying that
>>>> wouldn't be helpful either.
>>>>
>>>> I hope this clears things up,
>>>> Russ
>>>>
>>>> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman 
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>>>>
>>>>> As you can see from the code :
>>>>>
>>>>> STEP 1:  I  create a object of type s

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Thanks for the tip!

But if the first thing you come across
Is somebody  using the trim function to strip away spaces in /etc/hostnames
like so from :

127.0.0.1 hostname local

To

127.0.0.1hostnamelocal

Then there is a log error message showing the outcome of unnecessarily
using the trim function.

Especially when one of the spark core functionality is to read lines from
files separated by a space, comma.

Also have you seen the log4j.properties
Setting to ERROR and in one case FATAL
for suppressing discrepancies.

Please May I draw your attention and attention of all in the community to
this page Which shows turning on compiler WARNINGS  before releasing
software and other software best practices.

“The Power of 10 — NASA’s Rules for Coding” by Riccardo Giorato
https://link.medium.com/PUz88PIql3

What impression  would you have  ?



On Sat, 28 Mar 2020, 15:50 Jeff Evans, 
wrote:

> Dude, you really need to chill. Have you ever worked with a large open
> source project before? It seems not. Even so, insinuating there are tons of
> bugs that were left uncovered until you came along (despite the fact that
> the project is used by millions across many different organizations) is
> ludicrous. Learn a little bit of humility
>
> If you're new to something, assume you have made a mistake rather than
> that there is a bug. Lurk a bit more, or even do a simple Google search,
> and you will realize Sean is a very senior committer (i.e. expert) in
> Spark, and has been for many years. He, and everyone else participating in
> these lists, is doing it voluntarily on their own time. They're not being
> paid to handhold you and quickly answer to your every whim.
>
> On Sat, Mar 28, 2020, 10:46 AM Zahid Rahman  wrote:
>
>> So the schema is limited to holding only the DEFINITION of schema. For
>> example as you say  the columns, I.e. first column User:Int 2nd column
>> String:password.
>>
>> Not location of source I.e. csv file with or without header.  SQL DB
>> tables.
>>
>> I am pleased for once I am wrong about being another bug, and it was a
>> design decision adding flexibility.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
>> wrote:
>>
>>> This is probably more of a question for the user support list, but I
>>> believe I understand the issue.
>>>
>>> Schema inside of spark refers to the structure of the output rows, for
>>> example the schema for a particular dataframe could be
>>> (User: Int, Password: String) - Two Columns the first is User of type
>>> int and the second is Password of Type String.
>>>
>>> When you pass the schema from one reader to another, you are only
>>> copyting this structure, not all of the other options associated with the
>>> dataframe.
>>> This is usually useful when you are reading from sources with different
>>> options but data that needs to be read into the same structure.
>>>
>>> The other properties such as "format" and "options" exist independently
>>> of Schema. This is helpful if I was reading from both MySQL and
>>> a comma separated file for example. While the Schema is the same, the
>>> options like ("inferSchema") do not apply to both MySql and CSV and
>>> format actually picks whether to us "JDBC" or "CSV" so copying that
>>> wouldn't be helpful either.
>>>
>>> I hope this clears things up,
>>> Russ
>>>
>>> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman 
>>> wrote:
>>>
>>>> Hi,
>>>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>>>
>>>> As you can see from the code :
>>>>
>>>> STEP 1:  I  create a object of type static frame which holds all the
>>>> information to the datasource (csv files).
>>>>
>>>> STEP 2: Then I create a variable  called staticSchema  assigning the
>>>> information of the schema from the original static data frame.
>>>>
>>>> STEP 3: then I create another variable called val streamingDataFrame of
>>>> type spark.readStream.
>>>> and Into the .schema function parameters I pass the object staticSchema
>>>> which is meant to hold the information to the  csv files including the
>>>> .load(path) function etc.
>>>>
>>>> So then when I am creating val StreamingDataFrame and passing it
>>>> .schema(staticSchema)
>>>> the variable StreamingDataFrame  should have all the information.
>>>> I should only have to call .optio

Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
So the schema is limited to holding only the DEFINITION of schema. For
example as you say  the columns, I.e. first column User:Int 2nd column
String:password.

Not location of source I.e. csv file with or without header.  SQL DB tables.

I am pleased for once I am wrong about being another bug, and it was a
design decision adding flexibility.









On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // create spark session
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>> // set spark runtime  configuration
>> spark.conf.set("spark.sql.shuffle.partitions","5")
>> 
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>> // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>> .option ("header","true")
>> .option("inferschema","true")
>> .load("/data/retail-data/by-day/*.csv")
>>
>>
>> staticDataFrame.createOrReplaceTempView("retail_data")
>> val staticSchema = staticDataFrame.schema
>>
>> staticDataFrame
>>   .selectExpr(
>> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>   .groupBy(col("CustomerId"),
>> window(col("InvoiceDate"),
>> "1 day"))
>>   .sum("total_cost")
>>   .sort(desc("sum(total_cost)"))
>>   .show(2)
>>
>> val streamingDataFrame = spark.readStream
>>   .schema(staticSchema)
>>   .format("csv")
>>   .option("maxFilesPerTrigger", 1)
>>   .option("header","true")
>>   .load("/data/retail-data/by-day/*.csv")
>>
>>   println(streamingDataFrame.isStreaming)
>>
>> // lazy operation so we will need to call a streaming action to start 
>> the action
>> val purchaseByCustomerPerHour = streamingDataFrame
>> .selectExpr(
>>   "CustomerId",
>>   "(UnitPrice * Quantity) as total_cost",
>>   "InvoiceDate")
>> .groupBy(
>>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> .sum("total_cost")
>>
>> // stream action to write to console
>> purchaseByCustomerPerHour.writeStream
>>   .format("console")
>>   .queryName("customer_purchases")
>>   .outputMode("complete")
>>   .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> <http://www.backbutton.co.uk>
>>
>


Re: BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-28 Thread Zahid Rahman
Very kind of you.

On Sat, 28 Mar 2020, 15:24 Russell Spitzer, 
wrote:

> This is probably more of a question for the user support list, but I
> believe I understand the issue.
>
> Schema inside of spark refers to the structure of the output rows, for
> example the schema for a particular dataframe could be
> (User: Int, Password: String) - Two Columns the first is User of type int
> and the second is Password of Type String.
>
> When you pass the schema from one reader to another, you are only
> copyting this structure, not all of the other options associated with the
> dataframe.
> This is usually useful when you are reading from sources with different
> options but data that needs to be read into the same structure.
>
> The other properties such as "format" and "options" exist independently of
> Schema. This is helpful if I was reading from both MySQL and
> a comma separated file for example. While the Schema is the same, the
> options like ("inferSchema") do not apply to both MySql and CSV and
> format actually picks whether to us "JDBC" or "CSV" so copying that
> wouldn't be helpful either.
>
> I hope this clears things up,
> Russ
>
> On Sat, Mar 28, 2020, 12:33 AM Zahid Rahman  wrote:
>
>> Hi,
>> version: spark-3.0.0-preview2-bin-hadoop2.7
>>
>> As you can see from the code :
>>
>> STEP 1:  I  create a object of type static frame which holds all the
>> information to the datasource (csv files).
>>
>> STEP 2: Then I create a variable  called staticSchema  assigning the
>> information of the schema from the original static data frame.
>>
>> STEP 3: then I create another variable called val streamingDataFrame of
>> type spark.readStream.
>> and Into the .schema function parameters I pass the object staticSchema
>> which is meant to hold the information to the  csv files including the
>> .load(path) function etc.
>>
>> So then when I am creating val StreamingDataFrame and passing it
>> .schema(staticSchema)
>> the variable StreamingDataFrame  should have all the information.
>> I should only have to call .option("maxFilePerTrigger",1) and not .format
>> ("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
>> Otherwise what is the point of passing .schema(staticSchema) to
>> StreamingDataFrame.
>>
>> You can replicate it using the complete code below.
>>
>> import org.apache.spark.sql.SparkSession
>> import org.apache.spark.sql.functions.{window,column,desc,col}
>>
>> object RetailData {
>>
>>   def main(args: Array[String]): Unit = {
>>
>> // create spark session
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
>> Data").getOrCreate();
>> // set spark runtime  configuration
>> spark.conf.set("spark.sql.shuffle.partitions","5")
>> 
>> spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")
>>
>> // create a static frame
>>   val staticDataFrame = spark.read.format("csv")
>> .option ("header","true")
>> .option("inferschema","true")
>> .load("/data/retail-data/by-day/*.csv")
>>
>>
>> staticDataFrame.createOrReplaceTempView("retail_data")
>> val staticSchema = staticDataFrame.schema
>>
>> staticDataFrame
>>   .selectExpr(
>> "CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
>>   .groupBy(col("CustomerId"),
>> window(col("InvoiceDate"),
>> "1 day"))
>>   .sum("total_cost")
>>   .sort(desc("sum(total_cost)"))
>>   .show(2)
>>
>> val streamingDataFrame = spark.readStream
>>   .schema(staticSchema)
>>   .format("csv")
>>   .option("maxFilesPerTrigger", 1)
>>   .option("header","true")
>>   .load("/data/retail-data/by-day/*.csv")
>>
>>   println(streamingDataFrame.isStreaming)
>>
>> // lazy operation so we will need to call a streaming action to start 
>> the action
>> val purchaseByCustomerPerHour = streamingDataFrame
>> .selectExpr(
>>   "CustomerId",
>>   "(UnitPrice * Quantity) as total_cost",
>>   "InvoiceDate")
>> .groupBy(
>>   col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> .sum("total_cost")
>>
>> // stream action to write to console
>> purchaseByCustomerPerHour.writeStream
>>   .format("console")
>>   .queryName("customer_purchases")
>>   .outputMode("complete")
>>   .start()
>>
>>   } // main
>>
>> } // object
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> val staticSchema = staticDataFrame.schema
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> <http://www.backbutton.co.uk>
>>
>


BUG: spark.readStream .schema(staticSchema) not receiving schema information

2020-03-27 Thread Zahid Rahman
Hi,
version: spark-3.0.0-preview2-bin-hadoop2.7

As you can see from the code :

STEP 1:  I  create a object of type static frame which holds all the
information to the datasource (csv files).

STEP 2: Then I create a variable  called staticSchema  assigning the
information of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of
type spark.readStream.
and Into the .schema function parameters I pass the object staticSchema
which is meant to hold the information to the  csv files including the
.load(path) function etc.

So then when I am creating val StreamingDataFrame and passing it
.schema(staticSchema)
the variable StreamingDataFrame  should have all the information.
I should only have to call .option("maxFilePerTrigger",1) and not .format
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")
Otherwise what is the point of passing .schema(staticSchema) to
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

// create spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime  configuration
spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")


staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"),
window(col("InvoiceDate"),
"1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(2)

val streamingDataFrame = spark.readStream
  .schema(staticSchema)
  .format("csv")
  .option("maxFilesPerTrigger", 1)
  .option("header","true")
  .load("/data/retail-data/by-day/*.csv")

  println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to
start the action
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
  "CustomerId",
  "(UnitPrice * Quantity) as total_cost",
  "InvoiceDate")
.groupBy(
  col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")

// stream action to write to console
purchaseByCustomerPerHour.writeStream
  .format("console")
  .queryName("customer_purchases")
  .outputMode("complete")
  .start()

  } // main

} // object




















val staticSchema = staticDataFrame.schema













Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: OFF TOPIC LIST CRITERIA

2020-03-27 Thread Zahid Rahman
OK *user support. user@ is DONE !!!*

I reported a work around to an existing bug actually to the experienced
user.
and "the experienced user" was "not aware" of the
setting in the log4j.properties so he learned something new too.
Clearly neither were you.

Also it may surprise some people but  there are people who have been
formally
trained in software development.
We can tell a self trained a mile away.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Sat, 28 Mar 2020 at 03:02, Sean Owen  wrote:

> BCC user, dev, and I encourage others to not reply.
>
> I said _dev@_ is not for user support. user@ is. You heard that
> yesterday, too, and not to cross-post.
> You actually got answers to several questions, despite their tone,
> from experienced developers of the project.
>
> Messages like yours are, I assure you, not useful to _anybody_. If we
> let people talk like this on big community lists, yes _that_ will put
> up barriers.
> So, the answer for you is: you are not using either of these lists
> appropriately right now. If you can keep it civil and on-topic, use
> user@.
> Otherwise we will block you from the lists.
>
>
> Sean
>
> On Fri, Mar 27, 2020 at 9:46 PM Zahid Rahman  wrote:
> >
> >
> > Sean Owen says the criteria of these two emailing list is not help to
> support some body
> > who is new but for people who have been using the software for a long
> time.
> >
> > He is implying I think that I should only send email when I find bugs so
> that I can help him in his work.
> > A one way street.
> >
> > He is suggesting the more familiar you are with this software the more
> important you are.
> > Some kind of Alpha male type heirachy.
> >
> > He wants to put a barrier in place where Apache foundation wants no
> barriers to free learning and free software.
> >
> > He has not reported any bugs while I have reported so many in such a
> short space of time.
> > He has warned me as well
> >
> > So that Sean Owen does not put a barrier in place for me in my path to
> free learning and free  Apache software
> > I would like somebody to clarify the criteria for me.
> >
> >
> > Backbutton.co.uk
> > ¯\_(ツ)_/¯
> > ♡۶Java♡۶RMI ♡۶
> > Make Use Method {MUM}
> > makeuse.org
>


OFF TOPIC LIST CRITERIA

2020-03-27 Thread Zahid Rahman
Sean Owen says the criteria of these two emailing list is not help to
support some body
who is new but for people who have been using the software for a long time.

He is implying I think that I should only send email when I find bugs so
that I can help him in his work.
A one way street.

He is suggesting the more familiar you are with this software the more
important you are.
Some kind of Alpha male type heirachy.

He wants to put a barrier in place where Apache foundation wants no
barriers to free learning and free software.

He has not reported any bugs while I have reported so many in such a short
space of time.
He has warned me as well

So that Sean Owen does not put a barrier in place for me in my path to free
learning and free  Apache software
I would like somebody to clarify the criteria for me.


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: spark.readStream.schema(??)

2020-03-27 Thread Zahid Rahman
I found another bug.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Sat, 28 Mar 2020 at 02:12, Zahid Rahman  wrote:

>
> I have sorted the error anyway because I am the best there is.
> It is downhill for me from here.
>
> There is no nobody using this email list anyway for anything. The email is
> a dead a dodo.
> probably because of people like you.
>
> *That is exactly what this email is for.*
> *It is not just for me to test your buggy software and report the bugs
> free of cost.*
>
> *and not get anything in return.*
>
> *Another words free consultancy for you because you now the software *
> *after spending years of your life while I am going to mastering in weeks.*
>
> Have we eaten something that disagrees with us today.
> Do you have a sore throat ?
> May be a little temperature ?
>
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> <http://www.backbutton.co.uk>
>
>
> On Sat, 28 Mar 2020 at 02:03, Sean Owen  wrote:
>
>> (this isn't an email list for user support)
>>
>> On Fri, Mar 27, 2020 at 8:32 PM Zahid Rahman 
>> wrote:
>> >
>> > version: spark-3.0.0-preview2-bin-hadoop2.7
>> >
>> > The syntax checker objects to the following argument which is what I am
>> supposed to enter.
>> >
>> > .schema(staticSchema)
>> >
>> > However when I  provide the  following argument it works but I don't
>> think that is correct.
>> > What is the correct argument for this case ?
>> >
>> > import org.apache.spark.sql.SparkSession
>> > import org.apache.spark.sql.functions.{window,column,desc,col}
>> >
>> > object RetailData {
>> >
>> >
>> >   def main(args: Array[String]): Unit = {
>> >
>> > // crete spark session
>> > val spark = SparkSession.builder().master("spark://
>> 192.168.0.38:7077").appName("Retail Data").getOrCreate();
>> > // set spark runtime  configuration
>> > spark.conf.set("spark,sql.shuffle.partitions","5")
>> >
>> > // create a static frame
>> >   val staticDataFrame = spark.read.format("csv")
>> > .option ("header","true")
>> > .option("inferschema","true")
>> > .load("/data/retail-data/by-day/*.csv")
>> >
>> > staticDataFrame.createOrReplaceTempView("retail_data")
>> > val staticFrame = staticDataFrame.schema
>> >
>> > staticDataFrame
>> >   .selectExpr(
>> > "CustomerId","UnitPrice * Quantity as total_cost",
>> "InvoiceDate")
>> >   .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
>> >   .sum("total_cost")
>> >   .sort(desc("sum(total_cost)"))
>> >   .show(2)
>> >
>> > val streamingDataFrame = spark.readStream
>> >   .schema(staticDataFrame.schema)
>> >   .option("maxFilesPerTrigger", 1)
>> >   .load("/data/retail-data/by-day/*.csv")
>> >
>> >   println(streamingDataFrame.isStreaming)
>> >
>> >   } // main
>> >
>> > } // object
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Backbutton.co.uk
>> > ¯\_(ツ)_/¯
>> > ♡۶Java♡۶RMI ♡۶
>> > Make Use Method {MUM}
>> > makeuse.org
>>
>


spark.readStream.schema(??)

2020-03-27 Thread Zahid Rahman
version: spark-3.0.0-preview2-bin-hadoop2.7

The syntax checker objects to the following argument which is what I am
supposed to enter.

.schema(staticSchema)

However when I  provide the  following argument it works but I don't think
that is correct.
What is the correct argument for this case ?

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {


  def main(args: Array[String]): Unit = {

// crete spark session
val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();
// set spark runtime  configuration
spark.conf.set("spark,sql.shuffle.partitions","5")

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(2)

val streamingDataFrame = spark.readStream
  .schema(staticDataFrame.schema)
  .option("maxFilesPerTrigger", 1)
  .load("/data/retail-data/by-day/*.csv")

  println(streamingDataFrame.isStreaming)

  } // main

} // object







Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



what a plava !

2020-03-27 Thread Zahid Rahman
I was very impressed with the amount of material available from
https://github.com/databricks/Spark-The-Definitive-Guide/
Over 450+
*  megabytes.*


I have a corrected the scala code  by adding
*.sort(desc("sum(total_cost)"))* to the code provided on page 34 (see
below).

I have noticed numerous uses of exclamation marks almost over use.
for example:
page 23: Let's specify some more *transformatrions !*
page 24: you've read your first explain *plan !*
page 26: Notice that these plans compile to the exactsame underlying *plan
!*
page 29: The last step is our *action !*
page 34: The best thing about structured  streaming rapidly...
with *virtually
no code *

1. I have never read a science book with such emotion of frustration.
Is Spark difficult to understand made more complicated  with the
proliferation of languages
scala , Java , python SQL R.

2. Secondly, Is spark architecture made more complex due to competing
technologies ?

I have spark cluster setup with master and slave to load balancing heavy
activity like so:
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077
for load balancing I imagine, conceptually speaking,  although I haven't
tried it , I can have as many
slaves(workers)  on other physical machines  by simply downloading spark
zip file
and running workers from those other physical machine(s) with
sbin/start-slave.sh  spark://192.168.0.38:7077.

*My question is under the circumstances do I need to bother with mesos or
yarn ?*

Collins dictionary
The exclamation mark is used after exclamations and emphatic expressions.

   - I can’t believe it!
   - Oh, no! Look at this mess!

The exclamation mark loses its effect if it is overused. It is better to
use a full stop after a sentence expressing mild excitement or humour.

   It was such a beautiful day.
   I felt like a perfect banana.


import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

  def main(args: Array[String]): Unit = {

val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail
Data").getOrCreate();

// create a static frame
  val staticDataFrame = spark.read.format("csv")
.option ("header","true")
.option("inferschema","true")
.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")
val staticFrame = staticDataFrame.schema

staticDataFrame
  .selectExpr(
"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")
  .groupBy(col("CustomerId"), window(col("InvoiceDate"), "1 day"))
  .sum("total_cost")
  .sort(desc("sum(total_cost)"))
  .show(1)

  } // main

} // object



Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



Re: Programmatic: parquet file corruption error

2020-03-27 Thread Zahid Rahman
Thanks Wenchen.  SOLVED! KINDA!

I removed all dependencies from the pom.xml  in my IDE so I wouldn't be
picking up any libraries from maven repository.
I *instead* included the libraries (jar)  from the *spark download* of
*spark-3.0.0-preview2-bin-hadoop2.7*
This way I am using the *same libraries* which are used when running
*spark-submit
scripts*.

I  believe I managed to trace the issue.
I copied  the log4j.properties.template into Intellij's resources
directory in my project.
Obviously renaming it to log4.properties.
So now I am using also *same** log4j.properties* as when running *spark-submit
scipt.*

I noticed the value of *log4j.logger.org.apache.parquet=ERROR* &
*log4j.logger.parquet=ERROR*.
It appears that this parquet corruption warning is an *outstanding bug* and
the *work around* is to quieten the warning.


#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
log4j.rootCategory=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss}
%p %c{1}: %m%n

# Set the default spark-shell log level to WARN. When running the
spark-shell, the
# log level for this class is used to overwrite the root logger's log
level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=WARN

# Settings to quiet third party logs that are too verbose
log4j.logger.org.sparkproject.jetty=WARN
log4j.logger.org.sparkproject.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=WARN
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR

# SPARK-9183: Settings to avoid annoying messages when looking up
nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR




Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Fri, 27 Mar 2020 at 07:44, Wenchen Fan  wrote:

> Running Spark application with an IDE is not officially supported. It may
> work under some cases but there is no guarantee at all. The official way is
> to run interactive queries with spark-shell or package your application to
> a jar and use spark-submit.
>
> On Thu, Mar 26, 2020 at 4:12 PM Zahid Rahman  wrote:
>
>> Hi,
>>
>> When I run the code for a user defined data type dataset using case class
>> in scala  and run the code in the interactive spark-shell against parquet
>> file. The results are as expected.
>> However I then the same code programmatically in IntelliJ IDE then spark
>> is give a file corruption error.
>>
>> Steps I have taken to determine the source of error are :
>> I have tested for file permission and made sure to chmod 777 , just in
>> case.
>> I tried a fresh copy of same parquet file.
>> I ran both programme before and after the fresh copy.
>> I also rebooted then ran programmatically against a fresh parquet file.
>> The corruption error was consistent in all cases.
>> I have copy and pasted the spark-shell , the error message and the code
>> in the IDE and the pom.xml, IntelliJ java  classpath command line.
>>
>> Perhaps the code in the libraries are different than the ones  used by
>> spark-shell from that when run programmatically.
>> I don't believe it is an error on my part.
>>
>> <--
>>
>> 07:28:45 WARN  CorruptStatistics:117 - Ignoring statistics because
>> created_by could not be parsed (see PARQUET-251): parquet-mr (build
>> 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
>> org.apache.parquet.VersionParser$VersionParseException: Could not parse
>&g

Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-slave.sh spark://
192.168.0.38:7077
~/spark-3.0.0-preview2-bin-hadoop2.7$ sbin/start-master.sh

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Fri, 27 Mar 2020 at 06:12, Zahid Rahman  wrote:

> sbin/start-master.sh
> sbin/start-slave.sh spark://192.168.0.38:7077
>
> Backbutton.co.uk
> ¯\_(ツ)_/¯
> ♡۶Java♡۶RMI ♡۶
> Make Use Method {MUM}
> makeuse.org
> <http://www.backbutton.co.uk>
>
>
> On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:
>
>> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
>> just include Spark dependency in IntelliJ?
>>
>> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman 
>> wrote:
>>
>>> I have configured  in IntelliJ as external jars
>>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>>
>>> not pulling anything from maven.
>>>
>>> Backbutton.co.uk
>>> ¯\_(ツ)_/¯
>>> ♡۶Java♡۶RMI ♡۶
>>> Make Use Method {MUM}
>>> makeuse.org
>>> <http://www.backbutton.co.uk>
>>>
>>>
>>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>>
>>>> Which Spark/Scala version do you use?
>>>>
>>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>>> wrote:
>>>>
>>>>>
>>>>> with the following sparksession configuration
>>>>>
>>>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>>>> Session take").getOrCreate();
>>>>>
>>>>> this line works
>>>>>
>>>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>>>> "Canada").map(flight_row => flight_row).take(5)
>>>>>
>>>>>
>>>>> however if change the master url like so, with the ip address then the
>>>>> following error is produced by the position of .take(5)
>>>>>
>>>>> val spark = 
>>>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>>>> Session take").getOrCreate();
>>>>>
>>>>>
>>>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>>>> instance of java.lang.invoke.SerializedLambda to field
>>>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in 
>>>>> instance
>>>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>>>
>>>>> BUT if I  remove take(5) or change the position of take(5) or insert
>>>>> an extra take(5) as illustrated in code then it works. I don't see why the
>>>>> position of take(5) should cause such an error or be caused by changing 
>>>>> the
>>>>> master url
>>>>>
>>>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>>>> "Canada").map(flight_row => flight_row).take(5)
>>>>>
>>>>>   flights.take(5)
>>>>>
>>>>>   flights
>>>>>   .take(5)
>>>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
>>>>> + 5))
>>>>>flights.show(5)
>>>>>
>>>>>
>>>>> complete code if you wish to replicate it.
>>>>>
>>>>> import org.apache.spark.sql.SparkSession
>>>>>
>>>>> object sessiontest {
>>>>>
>>>>>   // define specific  data type class then manipulate it using the filter 
>>>>> and map functions
>>>>>   // this is also known as an Encoder
>>>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>>>  ORIGIN_COUNTRY_NAME:String,
>>>>>  count: BigInt)
>>>>>
>>>>>
>>>>>   def main(args:Array[String]): Unit ={
>>>>>
>>>>> val spark = 
>>>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>>>> Session take").getOrCreate();
>>>>>
>>>>> import spark.implicits._
>>>>> val flightDf = 
>>>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>>>> val flights = flightDf.as[flight]
>>>>>
>>>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME 
>>>>> != "Canada").map(flight_row => flight_row).take(5)
>>>>>
>>>>>   flights.take(5)
>>>>>
>>>>>   flights
>>>>>   .take(5)
>>>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>>>flights.show(5)
>>>>>
>>>>>   } // main
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Backbutton.co.uk
>>>>> ¯\_(ツ)_/¯
>>>>> ♡۶Java♡۶RMI ♡۶
>>>>> Make Use Method {MUM}
>>>>> makeuse.org
>>>>> <http://www.backbutton.co.uk>
>>>>>
>>>>


Re: BUG: take with SparkSession.master[url]

2020-03-27 Thread Zahid Rahman
sbin/start-master.sh
sbin/start-slave.sh spark://192.168.0.38:7077

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Fri, 27 Mar 2020 at 05:59, Wenchen Fan  wrote:

> Your Spark cluster, spark://192.168.0.38:7077, how is it deployed if you
> just include Spark dependency in IntelliJ?
>
> On Fri, Mar 27, 2020 at 1:54 PM Zahid Rahman  wrote:
>
>> I have configured  in IntelliJ as external jars
>> spark-3.0.0-preview2-bin-hadoop2.7/jar
>>
>> not pulling anything from maven.
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> <http://www.backbutton.co.uk>
>>
>>
>> On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:
>>
>>> Which Spark/Scala version do you use?
>>>
>>> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman 
>>> wrote:
>>>
>>>>
>>>> with the following sparksession configuration
>>>>
>>>> val spark = SparkSession.builder().master("local[*]").appName("Spark 
>>>> Session take").getOrCreate();
>>>>
>>>> this line works
>>>>
>>>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>>> "Canada").map(flight_row => flight_row).take(5)
>>>>
>>>>
>>>> however if change the master url like so, with the ip address then the
>>>> following error is produced by the position of .take(5)
>>>>
>>>> val spark = 
>>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>>> Session take").getOrCreate();
>>>>
>>>>
>>>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID
>>>> 1, 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>>>> instance of java.lang.invoke.SerializedLambda to field
>>>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>>>> of org.apache.spark.rdd.MapPartitionsRDD
>>>>
>>>> BUT if I  remove take(5) or change the position of take(5) or insert an
>>>> extra take(5) as illustrated in code then it works. I don't see why the
>>>> position of take(5) should cause such an error or be caused by changing the
>>>> master url
>>>>
>>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>>> "Canada").map(flight_row => flight_row).take(5)
>>>>
>>>>   flights.take(5)
>>>>
>>>>   flights
>>>>   .take(5)
>>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count 
>>>> + 5))
>>>>flights.show(5)
>>>>
>>>>
>>>> complete code if you wish to replicate it.
>>>>
>>>> import org.apache.spark.sql.SparkSession
>>>>
>>>> object sessiontest {
>>>>
>>>>   // define specific  data type class then manipulate it using the filter 
>>>> and map functions
>>>>   // this is also known as an Encoder
>>>>   case class flight (DEST_COUNTRY_NAME: String,
>>>>  ORIGIN_COUNTRY_NAME:String,
>>>>  count: BigInt)
>>>>
>>>>
>>>>   def main(args:Array[String]): Unit ={
>>>>
>>>> val spark = 
>>>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>>>> Session take").getOrCreate();
>>>>
>>>> import spark.implicits._
>>>> val flightDf = 
>>>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>>>> val flights = flightDf.as[flight]
>>>>
>>>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>>>> "Canada").map(flight_row => flight_row).take(5)
>>>>
>>>>   flights.take(5)
>>>>
>>>>   flights
>>>>   .take(5)
>>>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>>>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>>>flights.show(5)
>>>>
>>>>   } // main
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Backbutton.co.uk
>>>> ¯\_(ツ)_/¯
>>>> ♡۶Java♡۶RMI ♡۶
>>>> Make Use Method {MUM}
>>>> makeuse.org
>>>> <http://www.backbutton.co.uk>
>>>>
>>>


Re: BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
I have configured  in IntelliJ as external jars
spark-3.0.0-preview2-bin-hadoop2.7/jar

not pulling anything from maven.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org
<http://www.backbutton.co.uk>


On Fri, 27 Mar 2020 at 05:45, Wenchen Fan  wrote:

> Which Spark/Scala version do you use?
>
> On Fri, Mar 27, 2020 at 1:24 PM Zahid Rahman  wrote:
>
>>
>> with the following sparksession configuration
>>
>> val spark = SparkSession.builder().master("local[*]").appName("Spark Session 
>> take").getOrCreate();
>>
>> this line works
>>
>> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>
>> however if change the master url like so, with the ip address then the
>> following error is produced by the position of .take(5)
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>>
>> 20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
>> 192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
>> instance of java.lang.invoke.SerializedLambda to field
>> org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
>> of org.apache.spark.rdd.MapPartitionsRDD
>>
>> BUT if I  remove take(5) or change the position of take(5) or insert an
>> extra take(5) as illustrated in code then it works. I don't see why the
>> position of take(5) should cause such an error or be caused by changing the
>> master url
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 
>> 5))
>>flights.show(5)
>>
>>
>> complete code if you wish to replicate it.
>>
>> import org.apache.spark.sql.SparkSession
>>
>> object sessiontest {
>>
>>   // define specific  data type class then manipulate it using the filter 
>> and map functions
>>   // this is also known as an Encoder
>>   case class flight (DEST_COUNTRY_NAME: String,
>>  ORIGIN_COUNTRY_NAME:String,
>>  count: BigInt)
>>
>>
>>   def main(args:Array[String]): Unit ={
>>
>> val spark = 
>> SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark 
>> Session take").getOrCreate();
>>
>> import spark.implicits._
>> val flightDf = 
>> spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
>> val flights = flightDf.as[flight]
>>
>> flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != 
>> "Canada").map(flight_row => flight_row).take(5)
>>
>>   flights.take(5)
>>
>>   flights
>>   .take(5)
>>   .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
>>   .map(fr => flight(fr.DEST_COUNTRY_NAME, 
>> fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
>>flights.show(5)
>>
>>   } // main
>> }
>>
>>
>>
>>
>>
>> Backbutton.co.uk
>> ¯\_(ツ)_/¯
>> ♡۶Java♡۶RMI ♡۶
>> Make Use Method {MUM}
>> makeuse.org
>> <http://www.backbutton.co.uk>
>>
>


BUG: take with SparkSession.master[url]

2020-03-26 Thread Zahid Rahman
with the following sparksession configuration

val spark = SparkSession.builder().master("local[*]").appName("Spark
Session take").getOrCreate();

this line works

flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(5)


however if change the master url like so, with the ip address then the
following error is produced by the position of .take(5)

val spark = 
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark
Session take").getOrCreate();


20/03/27 05:15:20 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1,
192.168.0.38, executor 0): java.lang.ClassCastException: cannot assign
instance of java.lang.invoke.SerializedLambda to field
org.apache.spark.rdd.MapPartitionsRDD.f of type scala.Function3 in instance
of org.apache.spark.rdd.MapPartitionsRDD

BUT if I  remove take(5) or change the position of take(5) or insert an
extra take(5) as illustrated in code then it works. I don't see why the
position of take(5) should cause such an error or be caused by changing the
master url

flights.take(5).filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(5)

  flights.take(5)

  flights
  .take(5)
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
   flights.show(5)


complete code if you wish to replicate it.

import org.apache.spark.sql.SparkSession

object sessiontest {

  // define specific  data type class then manipulate it using the
filter and map functions
  // this is also known as an Encoder
  case class flight (DEST_COUNTRY_NAME: String,
 ORIGIN_COUNTRY_NAME:String,
 count: BigInt)


  def main(args:Array[String]): Unit ={

val spark =
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Spark
Session take").getOrCreate();

import spark.implicits._
val flightDf =
spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightDf.as[flight]

flights.take(5).filter(flight_row =>
flight_row.ORIGIN_COUNTRY_NAME != "Canada").map(flight_row =>
flight_row).take(5)

  flights.take(5)

  flights
  .take(5)
  .filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
  .map(fr => flight(fr.DEST_COUNTRY_NAME,
fr.ORIGIN_COUNTRY_NAME,fr.count + 5))
   flights.show(5)

  } // main
}





Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



results of taken(3) not appearing in console window

2020-03-26 Thread Zahid Rahman
I am running the same code with the same libraries but not getting same
output.
scala>  case class flight (DEST_COUNTRY_NAME: String,
 |  ORIGIN_COUNTRY_NAME:String,
 |  count: BigInt)
defined class flight

scala> val flightDf =
spark.read.parquet("/data/flight-data/parquet/2010-summary.parquet/")
flightDf: org.apache.spark.sql.DataFrame = [DEST_COUNTRY_NAME: string,
ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> val flights = flightDf.as[flight]
flights: org.apache.spark.sql.Dataset[flight] = [DEST_COUNTRY_NAME: string,
ORIGIN_COUNTRY_NAME: string ... 1 more field]

scala> flights.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME !=
"Canada").map(flight_row => flight_row).take(3)

*res0: Array[flight] = Array(flight(United States,Romania,1), flight(United
States,Ireland,264), flight(United States,India,69))*


Re: Need to order iterator values in spark dataframe

2020-03-26 Thread Zahid Rahman
I believe I logged an issue first and I should get a response first.
I was ignored.

Regards

Did you know there are 8 million people in kashmir locked up in their homes
by the Hindutwa (Indians)
for 8 months.
Now the whole planet is locked up in their homes.
You didn't take notice of them either.
you ignored them.

Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Thu, 26 Mar 2020 at 17:24, Enrico Minack  wrote:

> Abhinav,
>
> you can repartition by your key, then sortWithinPartition, and the
> groupByKey. Since data are already hash-partitioned by key, Spark should
> not shuffle the data hence change the sort wihtin each partition:
>
> ds.repartition($"key").sortWithinPartitions($"code").groupBy($"key")
>
> Enrico
>
> Am 26.03.20 um 17:53 schrieb Ranjan, Abhinav:
>
> Hi,
>
> I have a dataframe which has data like:
>
> key |code|code_value
> 1|c1|11
> 1|c2|12
> 1|c2|9
> 1|c3|12
> 1|c2|13
> 1|c2|14
> 1|c4|12
> 1|c2|15
> 1|c1|12
>
>
> I need to group the data based on key and then apply some custom logic on
> every of the value I got by grouping. So I did this:
>
> lets suppose it is in a dataframe df.
>
> *case class key_class(key: string, code: string, code_value: string)*
>
>
> df
> .as[key_class]
> .groupByKey(_.key)
> .mapGroups {
>   (x, groupedValues) =>
> val status = groupedValues.map(row => {
>   // do some custom logic on row
>   ("SUCCESS")
> }).toList
>
> }.toDF("status")
>
>
> The issue with above approach is the values I get after applying
> groupByKey are not sorted/ordered. I want the values to be sorted by the
> column 'code'.
>
> There is a way to do this:
>
> 1. get them in a list and then apply sort ==> this will result in OOM if
> the iterartor is too big.
>
> 2. I think some how to apply the secondary sort, but problem with that
> approach is I have to keep track of the key change.
>
> 3. sortWithinPartitions cannot be applied because groupBy will mess up the
> order.
>
> 4. Another approach is:
>
> df
> .as[key_class]
> .sort("key").sort("code")
> .map {
>  // do stuff here
> }
>
> but here also I have to keep track of the key change within map function,
> and sometimes this also overflows if the keys are skewed.
>
>
>
> *So is there any way in which I can get the values sorted after grouping
> them by a key.??*
>
>
> *Thanks,*
>
>
> *Abhinav *
>
>
>


Programmatic: parquet file corruption error

2020-03-26 Thread Zahid Rahman
Hi,

When I run the code for a user defined data type dataset using case class
in scala  and run the code in the interactive spark-shell against parquet
file. The results are as expected.
However I then the same code programmatically in IntelliJ IDE then spark is
give a file corruption error.

Steps I have taken to determine the source of error are :
I have tested for file permission and made sure to chmod 777 , just in
case.
I tried a fresh copy of same parquet file.
I ran both programme before and after the fresh copy.
I also rebooted then ran programmatically against a fresh parquet file.
The corruption error was consistent in all cases.
I have copy and pasted the spark-shell , the error message and the code in
the IDE and the pom.xml, IntelliJ java  classpath command line.

Perhaps the code in the libraries are different than the ones  used by
spark-shell from that when run programmatically.
I don't believe it is an error on my part.
<--

07:28:45 WARN  CorruptStatistics:117 - Ignoring statistics because
created_by could not be parsed (see PARQUET-251): parquet-mr (build
32c46643845ea8a705c35d4ec8fc654cc8ff816d)
org.apache.parquet.VersionParser$VersionParseException: Could not parse
created_by: parquet-mr (build 32c46643845ea8a705c35d4ec8fc654cc8ff816d)
using format:
(.*?)\s+version\s*(?:([^(]*?)\s*(?:\(\s*build\s*([^)]*?)\s*\))?)?
at org.apache.parquet.VersionParser.parse(VersionParser.java:112)
at
org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics(CorruptStatistics.java:72)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatisticsInternal(ParquetMetadataConverter.java:435)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics(ParquetMetadataConverter.java:454)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetMetadata(ParquetMetadataConverter.java:914)
at
org.apache.parquet.format.converter.ParquetMetadataConverter.readParquetMetadata(ParquetMetadataConverter.java:885)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:532)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:505)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:499)
at
org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:448)
at
org.apache.spark.sql.execution.datasources.parquet.SpecificParquetRecordReaderBase.initialize(SpecificParquetRecordReaderBase.java:105)
at
org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.initialize(VectorizedParquetRecordReader.java:131)
at
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.buildReaderBase(ParquetPartitionReaderFactory.scala:174)
at
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.createVectorizedReader(ParquetPartitionReaderFactory.scala:205)
at
org.apache.spark.sql.execution.datasources.v2.parquet.ParquetPartitionReaderFactory.buildColumnarReader(ParquetPartitionReaderFactory.scala:103)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory.$anonfun$createColumnarReader$1(FilePartitionReaderFactory.scala:38)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReaderFactory$$Lambda$2018/.apply(Unknown
Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:459)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.getNextReader(FilePartitionReader.scala:109)
at
org.apache.spark.sql.execution.datasources.v2.FilePartitionReader.next(FilePartitionReader.scala:42)
at
org.apache.spark.sql.execution.datasources.v2.DataSourceRDD$$anon$1.hasNext(DataSourceRDD.scala:62)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown
Source)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:726)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:321)
at
org.apache.spark.sql.execution.SparkPlan$$Lambda$1879/.apply(Unknown
Source)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:872)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:872)
at org.apache.spark.rdd.RDD$$Lambda$1875/.apply(Unknown
Source)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at 

Re: Unsubscribe

2020-03-22 Thread Zahid Rahman
To unsubscribe e-mail: dev-unsubscr...@spark.apache.org


Backbutton.co.uk
¯\_(ツ)_/¯
♡۶Java♡۶RMI ♡۶
Make Use Method {MUM}
makeuse.org



On Sun, 22 Mar 2020 at 16:16, Duan,Bing  wrote:

> Hi:
>  Plz  Unsubscribe me.
>
> Thanks!
>
> -
> To unsubscribe e-mail: dev-unsubscr...@spark.apache.org
>
>