Re: How to populate all possible combination values in columns using Spark SQL

2020-05-06 Thread Sonal Goyal
As mentioned in the comments on SO, can you provide a (masked) sample of
the data? It will be easier to see what you are trying to do if you add the
year column

Thanks,
Sonal
Nube Technologies 






On Thu, May 7, 2020 at 10:26 AM Aakash Basu 
wrote:

> Hi,
>
> I've described the problem in Stack Overflow with a lot of detailing, can
> you kindly check and help if possible?
>
> https://stackoverflow.com/q/61643910/5536733
>
> I'd be absolutely fine if someone solves it using Spark SQL APIs rather
> than plain spark SQL query.
>
> Thanks,
> Aakash.
>


How to populate all possible combination values in columns using Spark SQL

2020-05-06 Thread Aakash Basu
Hi,

I've described the problem in Stack Overflow with a lot of detailing, can
you kindly check and help if possible?

https://stackoverflow.com/q/61643910/5536733

I'd be absolutely fine if someone solves it using Spark SQL APIs rather
than plain spark SQL query.

Thanks,
Aakash.


Cyber bullying for reporting bugs

2020-05-06 Thread JeffEvans1112
@Jeff Evans

@Sean Owen

Both of these postings are examples of same object orientated concept.

They are examples of extraction of child Object from Parent Object.

The difference is that when a Muslim asked he was told by Jeff Evans

"we are not here handhold  you."

“do a simple Google search”

“They're not being paid to handhold you and quickly answer to your every whim.”

COMPARATIVELY

BUT when the good Dr Mich Talebzadeh asked same. No humiliation or offensive 
comments.

No comments at all.

Hi,

Thank you all,

I am just thinking of passing that date 06/04/2020 12:03:43 and getting the 
correct format from the module. In effect

This date format -MM-dd'T'HH:mm:ss.SZ as pattern

in other words rather than new Date() pass "06/04/2020 12:03:43" as string

REgards,

Dr Mich Talebzadeh

val fixedStr = "2020-06-04T12:03:43";

val dt = new DateTime(fixedStr);

val jdkDate = dt.toDate();

val pattern3 = "dd  MM HH:mm:ss.SZ";

val simpleDateFormat3 = (new SimpleDateFormat(pattern2, new Locale("en", 
"UK")));

val date3 = simpleDateFormat3.format(jdkDate);

System.out.println(date3);

On Sat, 28 Mar 2020, 15:50 Jeff Evans, <[hidden email]> wrote:

Dude, you really need to chill. Have you ever worked with a large open source 
project before? It seems not. Even so, insinuating there are tons of bugs that 
were left uncovered until you came along (despite the fact that the project is 
used by millions across many different organizations) is ludicrous. Learn a 
little bit of humility

If you're new to something, assume you have made a mistake rather than that 
there is a bug. Lurk a bit more, or even do a simple Google search, and you 
will realize Sean is a very senior committer (i.e. expert) in Spark, and has 
been for many years. He, and everyone else participating in these lists, is 
doing it voluntarily on their own time. They're not being paid to handhold you 
and quickly answer to your every whim.

As you can see from the code :

STEP 1: I create a object of type static frame which holds all the information 
to the datasource (csv files).

STEP 2: Then I create a variable called staticSchema assigning the information 
of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type 
spark.readStream.

and Into the .schema function parameters I pass the object staticSchema which 
is meant to hold the information to the csv files including the .load(path) 
function etc.

So then when I am creating val StreamingDataFrame and passing it 
.schema(staticSchema)

the variable StreamingDataFrame should have all the information.

I should only have to call .option("maxFilePerTrigger",1) and not .format 
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")

Otherwise what is the point of passing .schema(staticSchema) to 
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session

val spark = 
SparkSession.builder().master("spark://[192.168.0.38:7077](http://192.168.0.38:7077/)").appName("Retail
 Data").getOrCreate();

// set spark runtime configuration

spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame

val staticDataFrame = spark.read.format("csv")

.option ("header","true")

.option("inferschema","true")

.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema

staticDataFrame

.selectExpr(

"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")

.groupBy(col("CustomerId"),

window(col("InvoiceDate"),

"1 day"))

.sum("total_cost")

.sort(desc("sum(total_cost)"))

.show(2)

val streamingDataFrame = spark.readStream

.schema(staticSchema)

.format("csv")

.option("maxFilesPerTrigger", 1)

.option("header","true")

.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action

val purchaseByCustomerPerHour = streamingDataFrame

.selectExpr(

"CustomerId",

"(UnitPrice * Quantity) as total_cost",

"InvoiceDate")

.groupBy(

col("CustomerId"), window(col("InvoiceDate"), "1 day"))

.sum("total_cost")

// stream action to write to console

purchaseByCustomerPerHour.writeStream

.format("console")

.queryName("customer_purchases")

.outputMode("complete")

.start()

} // main

} // object

Sent with [ProtonMail](https://protonmail.com) Secure Email.

cyber bullying by so...@apache.org

2020-05-06 Thread JeffEvans1112
@Jeff Evans

@Sean Owen

Both of these postings are examples of same object orientated concept.

They examples of extraction of child Object from Parent Object.

The difference is that when a Muslim asked he was told by Jeff Evans

"we are not here handhold  you."

“do a simple Google search”

“They're not being paid to handhold you and quickly answer to your every whim.”

COMPARATIVELY

BUT when the good Dr Mich Talebzadeh asked same. No humiliation or offensive 
comments.

No comments at all.

Hi,

Thank you all,

I am just thinking of passing that date 06/04/2020 12:03:43 and getting the 
correct format from the module. In effect

This date format -MM-dd'T'HH:mm:ss.SZ as pattern

in other words rather than new Date() pass "06/04/2020 12:03:43" as string

REgards,

Dr Mich Talebzadeh

val fixedStr = "2020-06-04T12:03:43";

val dt = new DateTime(fixedStr);

val jdkDate = dt.toDate();

val pattern3 = "dd  MM HH:mm:ss.SZ";

val simpleDateFormat3 = (new SimpleDateFormat(pattern2, new Locale("en", 
"UK")));

val date3 = simpleDateFormat3.format(jdkDate);

System.out.println(date3);

On Sat, 28 Mar 2020, 15:50 Jeff Evans, <[hidden email]> wrote:

Dude, you really need to chill. Have you ever worked with a large open source 
project before? It seems not. Even so, insinuating there are tons of bugs that 
were left uncovered until you came along (despite the fact that the project is 
used by millions across many different organizations) is ludicrous. Learn a 
little bit of humility

If you're new to something, assume you have made a mistake rather than that 
there is a bug. Lurk a bit more, or even do a simple Google search, and you 
will realize Sean is a very senior committer (i.e. expert) in Spark, and has 
been for many years. He, and everyone else participating in these lists, is 
doing it voluntarily on their own time. They're not being paid to handhold you 
and quickly answer to your every whim.

As you can see from the code :

STEP 1: I create a object of type static frame which holds all the information 
to the datasource (csv files).

STEP 2: Then I create a variable called staticSchema assigning the information 
of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type 
spark.readStream.

and Into the .schema function parameters I pass the object staticSchema which 
is meant to hold the information to the csv files including the .load(path) 
function etc.

So then when I am creating val StreamingDataFrame and passing it 
.schema(staticSchema)

the variable StreamingDataFrame should have all the information.

I should only have to call .option("maxFilePerTrigger",1) and not .format 
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")

Otherwise what is the point of passing .schema(staticSchema) to 
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session

val spark = 
SparkSession.builder().master("spark://[192.168.0.38:7077](http://192.168.0.38:7077/)").appName("Retail
 Data").getOrCreate();

// set spark runtime configuration

spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame

val staticDataFrame = spark.read.format("csv")

.option ("header","true")

.option("inferschema","true")

.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema

staticDataFrame

.selectExpr(

"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")

.groupBy(col("CustomerId"),

window(col("InvoiceDate"),

"1 day"))

.sum("total_cost")

.sort(desc("sum(total_cost)"))

.show(2)

val streamingDataFrame = spark.readStream

.schema(staticSchema)

.format("csv")

.option("maxFilesPerTrigger", 1)

.option("header","true")

.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action

val purchaseByCustomerPerHour = streamingDataFrame

.selectExpr(

"CustomerId",

"(UnitPrice * Quantity) as total_cost",

"InvoiceDate")

.groupBy(

col("CustomerId"), window(col("InvoiceDate"), "1 day"))

.sum("total_cost")

// stream action to write to console

purchaseByCustomerPerHour.writeStream

.format("console")

.queryName("customer_purchases")

.outputMode("complete")

.start()

} // main

} // object

Sent with [ProtonMail](https://protonmail.com) Secure Email.

Re: How to unsubscribe

2020-05-06 Thread Denny Lee
Hi Fred,

To unsubscribe, could you please email: user-unsubscr...@spark.apache.org
(for more information, please refer to
https://spark.apache.org/community.html).

Thanks!
Denny


On Wed, May 6, 2020 at 10:12 AM Fred Liu  wrote:

> Hi guys
>
>
>
> -
>
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>
>
>
>
> *From:* Fred Liu 
> *Sent:* Wednesday, May 6, 2020 10:10 AM
> *To:* user@spark.apache.org
> *Subject:* Unsubscribe
>
>
>
> *[External E-mail]*
>
> *CAUTION: This email originated from outside the organization. Do not
> click links or open attachments unless you recognize the sender and know
> the content is safe.*
>
>
>
>
>


Abstract of child object from Parent Object

2020-05-06 Thread JeffEvans
@Jeff Evans

@Sean Owen

Both of these postings are examples of same object orientated concept.

They examples of extraction of child Object from Parent Object.

The difference is that when a Muslim asked he was told by Jeff Evans

"we are not here handhold  you."

“do a simple Google search”

“They're not being paid to handhold you and quickly answer to your every whim.”

COMPARATIVELY

BUT when the good Dr Mich Talebzadeh asked same. No humiliation or offensive 
comments.

No comments at all.

Hi,

Thank you all,

I am just thinking of passing that date 06/04/2020 12:03:43 and getting the 
correct format from the module. In effect

This date format -MM-dd'T'HH:mm:ss.SZ as pattern

in other words rather than new Date() pass "06/04/2020 12:03:43" as string

REgards,

Dr Mich Talebzadeh

val fixedStr = "2020-06-04T12:03:43";

val dt = new DateTime(fixedStr);

val jdkDate = dt.toDate();

val pattern3 = "dd  MM HH:mm:ss.SZ";

val simpleDateFormat3 = (new SimpleDateFormat(pattern2, new Locale("en", 
"UK")));

val date3 = simpleDateFormat3.format(jdkDate);

System.out.println(date3);

On Sat, 28 Mar 2020, 15:50 Jeff Evans, <[hidden email]> wrote:

Dude, you really need to chill. Have you ever worked with a large open source 
project before? It seems not. Even so, insinuating there are tons of bugs that 
were left uncovered until you came along (despite the fact that the project is 
used by millions across many different organizations) is ludicrous. Learn a 
little bit of humility

If you're new to something, assume you have made a mistake rather than that 
there is a bug. Lurk a bit more, or even do a simple Google search, and you 
will realize Sean is a very senior committer (i.e. expert) in Spark, and has 
been for many years. He, and everyone else participating in these lists, is 
doing it voluntarily on their own time. They're not being paid to handhold you 
and quickly answer to your every whim.

As you can see from the code :

STEP 1: I create a object of type static frame which holds all the information 
to the datasource (csv files).

STEP 2: Then I create a variable called staticSchema assigning the information 
of the schema from the original static data frame.

STEP 3: then I create another variable called val streamingDataFrame of type 
spark.readStream.

and Into the .schema function parameters I pass the object staticSchema which 
is meant to hold the information to the csv files including the .load(path) 
function etc.

So then when I am creating val StreamingDataFrame and passing it 
.schema(staticSchema)

the variable StreamingDataFrame should have all the information.

I should only have to call .option("maxFilePerTrigger",1) and not .format 
("csv") .option("header","true").load("/data/retail-data/by-day/*.csv")

Otherwise what is the point of passing .schema(staticSchema) to 
StreamingDataFrame.

You can replicate it using the complete code below.

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions.{window,column,desc,col}

object RetailData {

def main(args: Array[String]): Unit = {

// create spark session

val spark = 
SparkSession.builder().master("spark://192.168.0.38:7077").appName("Retail 
Data").getOrCreate();

// set spark runtime configuration

spark.conf.set("spark.sql.shuffle.partitions","5")

spark.conf.set("spark.sql.streaming.forceDeleteTempCheckpointLocation","True")

// create a static frame

val staticDataFrame = spark.read.format("csv")

.option ("header","true")

.option("inferschema","true")

.load("/data/retail-data/by-day/*.csv")

staticDataFrame.createOrReplaceTempView("retail_data")

val staticSchema = staticDataFrame.schema

staticDataFrame

.selectExpr(

"CustomerId","UnitPrice * Quantity as total_cost", "InvoiceDate")

.groupBy(col("CustomerId"),

window(col("InvoiceDate"),

"1 day"))

.sum("total_cost")

.sort(desc("sum(total_cost)"))

.show(2)

val streamingDataFrame = spark.readStream

.schema(staticSchema)

.format("csv")

.option("maxFilesPerTrigger", 1)

.option("header","true")

.load("/data/retail-data/by-day/*.csv")

println(streamingDataFrame.isStreaming)

// lazy operation so we will need to call a streaming action to start the action

val purchaseByCustomerPerHour = streamingDataFrame

.selectExpr(

"CustomerId",

"(UnitPrice * Quantity) as total_cost",

"InvoiceDate")

.groupBy(

col("CustomerId"), window(col("InvoiceDate"), "1 day"))

.sum("total_cost")

// stream action to write to console

purchaseByCustomerPerHour.writeStream

.format("console")

.queryName("customer_purchases")

.outputMode("complete")

.start()

} // main

} // object

Re: Which SQL flavor does Spark SQL follow?

2020-05-06 Thread Mich Talebzadeh
it closely follows Hive sql.

from the analytical functions its is similar to Oracle. Anyway if you know
good SQL as opposed to Java programmer turned to SQL writer you should be
OK.

HTH

Dr Mich Talebzadeh



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Wed, 6 May 2020 at 21:35, Aakash Basu  wrote:

> Hi,
>
> Wish to know, which type of SQL syntax is followed when we write a plain
> SQL query inside spark.sql? Is it MySQL or PGSQL? I know it isn't SQL
> Server or Oracle as while migrating, had to convert a lot of SQL functions.
>
> Also if you can provide a documentation which clearly says the above would
> help.
>
> Thanks,
> AB
>
>


Re: Which SQL flavor does Spark SQL follow?

2020-05-06 Thread Jeff Evans
https://docs.databricks.com/spark/latest/spark-sql/language-manual/index.html

https://spark.apache.org/docs/latest/api/sql/index.html

On Wed, May 6, 2020 at 3:35 PM Aakash Basu 
wrote:

> Hi,
>
> Wish to know, which type of SQL syntax is followed when we write a plain
> SQL query inside spark.sql? Is it MySQL or PGSQL? I know it isn't SQL
> Server or Oracle as while migrating, had to convert a lot of SQL functions.
>
> Also if you can provide a documentation which clearly says the above would
> help.
>
> Thanks,
> AB
>
>


Which SQL flavor does Spark SQL follow?

2020-05-06 Thread Aakash Basu
Hi,

Wish to know, which type of SQL syntax is followed when we write a plain
SQL query inside spark.sql? Is it MySQL or PGSQL? I know it isn't SQL
Server or Oracle as while migrating, had to convert a lot of SQL functions.

Also if you can provide a documentation which clearly says the above would
help.

Thanks,
AB


How to unsubscribe

2020-05-06 Thread Fred Liu
Hi guys


-

To unsubscribe e-mail: 
user-unsubscr...@spark.apache.org



From: Fred Liu 
Sent: Wednesday, May 6, 2020 10:10 AM
To: user@spark.apache.org
Subject: Unsubscribe


[External E-mail]

CAUTION: This email originated from outside the organization. Do not click 
links or open attachments unless you recognize the sender and know the content 
is safe.




Unsubscribe

2020-05-06 Thread Fred Liu



Re: Pyspark Kafka Structured Stream not working.

2020-05-06 Thread Jungtaek Lim
Hi,

1. You seem to use DStream (Spark Streaming), not Structured Streaming.
2. I'm not familiar with pyspark, but looks like the error message is very
clear - Kafka doesn't allow such name for "client.id". The error message
guides the naming rule, so you may need to be adopted with such convention.
(e.g. no space)

Hope this helps,

Thanks,
Jungtaek Lim (HeartSaVioR)

On Wed, May 6, 2020 at 5:36 PM Vijayant Kumar
 wrote:

> Hi All,
>
>
>
> I am getting the below error while using Pyspark Structured Streaming from
> Kafka Producer.
>
>
>
> 20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - kafka.common.InvalidConfigException:
> client.id Python Kafka streamer is illegal, contains a character other
> than ASCII alphanumerics, '.', '_' and '-'
>
>
>
> I am using the below code to get the messages:
>
>
>
> broker='vm105:2181'
>
> topic='Hello-Kafka'
>
> print 'broker topic is ',broker,topic
>
> kvs = KafkaUtils.createStream(ssc, \
>
>   broker, \
>
>   "Python Kafka streamer",{topic:1})
>
>
>
> And my Submit command is like below :-
>
> *spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar
> test_kafka.py vm105:2181 Hello-Kafka*
>
>
>
> Can any one help me what am I missing. ?
>
>
>
> Thanks,
>
> Vijayant
> --
>
> This e-mail message may contain confidential or proprietary information of
> Mavenir Systems, Inc. or its affiliates and is intended solely for the use
> of the intended recipient(s). If you are not the intended recipient of this
> message, you are hereby notified that any review, use or distribution of
> this information is absolutely prohibited and we request that you delete
> all copies in your control and contact us by e-mailing to
> secur...@mavenir.com. This message contains the views of its author and
> may not necessarily reflect the views of Mavenir Systems, Inc. or its
> affiliates, who employ systems to monitor email messages, but make no
> representation that such messages are authorized, secure, uncompromised, or
> free from computer viruses, malware, or other defects. Thank You
>


Pyspark Kafka Structured Stream not working.

2020-05-06 Thread Vijayant Kumar
Hi All,

I am getting the below error while using Pyspark Structured Streaming from 
Kafka Producer.

20/05/06 11:51:16 ERROR ReceiverTracker: Deregistered receiver for stream 0: 
Error starting receiver 0 - kafka.common.InvalidConfigException: client.id 
Python Kafka streamer is illegal, contains a character other than ASCII 
alphanumerics, '.', '_' and '-'

I am using the below code to get the messages:

broker='vm105:2181'
topic='Hello-Kafka'
print 'broker topic is ',broker,topic
kvs = KafkaUtils.createStream(ssc, \
  broker, \
  "Python Kafka streamer",{topic:1})

And my Submit command is like below :-
spark-submit --jars spark-streaming-kafka-0-8-assembly_2.11-2.4.4.jar 
test_kafka.py vm105:2181 Hello-Kafka

Can any one help me what am I missing. ?

Thanks,
Vijayant

This e-mail message may contain confidential or proprietary information of 
Mavenir Systems, Inc. or its affiliates and is intended solely for the use of 
the intended recipient(s). If you are not the intended recipient of this 
message, you are hereby notified that any review, use or distribution of this 
information is absolutely prohibited and we request that you delete all copies 
in your control and contact us by e-mailing to secur...@mavenir.com. This 
message contains the views of its author and may not necessarily reflect the 
views of Mavenir Systems, Inc. or its affiliates, who employ systems to monitor 
email messages, but make no representation that such messages are authorized, 
secure, uncompromised, or free from computer viruses, malware, or other 
defects. Thank You


Re: Spark hangs while reading from jdbc - does nothing Removing Guess work from trouble shooting

2020-05-06 Thread Ruijing Li
Wanted to update everyone on this, thanks for all the responses. I was able
to solve this issue after doing a jstack dump - I found out this was the
cause

https://github.com/scala/bug/issues/10436

Lesson learned - I’ll use a safer json parser like json4s, seems like that
one should be able to be thread-safe hopefully.

On Fri, Apr 24, 2020 at 4:34 AM Waleed Fateem 
wrote:

> Are you running this in local mode? If not, are you even sure that the
> hanging is occurring on the driver's side?
>
> Did you check the Spark UI to see if there is a straggler task or not? If
> you do have a straggler/hanging task, and in case this is not an
> application running in local mode then you need to get the Java thread dump
> of the executor's JVM process. Once you do, you'll want to review the 
> "Executor
> task launch worker for task XYZ" thread, whee XYZ is some integer value
> representing the task ID that was launched on that executor. In case you're 
> running
> this is local mode that thread would be located in the same Java thread
> dump that you have already collected.
>
>
> On Tue, Apr 21, 2020 at 9:51 PM Ruijing Li  wrote:
>
>> I apologize, but I cannot share it, even if it is just typical spark
>> libraries. I definitely understand that limits debugging help, but wanted
>> to understand if anyone has encountered a similar issue.
>>
>> On Tue, Apr 21, 2020 at 7:12 PM Jungtaek Lim <
>> kabhwan.opensou...@gmail.com> wrote:
>>
>>> If there's no third party libraries in the dump then why not share the
>>> thread dump? (I mean, the output of jstack)
>>>
>>> stack trace would be more helpful to find which thing acquired lock and
>>> which other things are waiting for acquiring lock, if we suspect deadlock.
>>>
>>> On Wed, Apr 22, 2020 at 2:38 AM Ruijing Li 
>>> wrote:
>>>
 After refreshing a couple of times, I notice the lock is being swapped
 between these 3. The other 2 will be blocked by whoever gets this lock, in
 a cycle of 160 has lock -> 161 -> 159 -> 160

 On Tue, Apr 21, 2020 at 10:33 AM Ruijing Li 
 wrote:

> In thread dump, I do see this
> - SparkUI-160- acceptor-id-ServerConnector@id(HTTP/1.1) | RUNNABLE |
> Monitor
> - SparkUI-161-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
> Blocked by Thread(Some(160)) Lock
> -  SparkUI-159-acceptor-id-ServerConnector@id(HTTP/1.1) | BLOCKED |
> Blocked by Thread(Some(160)) Lock
>
> Could the fact that 160 has the monitor but is not running be causing
> a deadlock preventing the job from finishing?
>
> I do see my Finalizer and main method are waiting. I don’t see any
> other threads from 3rd party libraries or my code in the dump. I do see
> spark context cleaner has timed waiting.
>
> Thanks
>
>
> On Tue, Apr 21, 2020 at 9:58 AM Ruijing Li 
> wrote:
>
>> Strangely enough I found an old issue that is the exact same issue as
>> mine
>>
>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-18343
>>
>> However I’m using spark 2.4.4 so the issue should have been solved by
>> now.
>>
>> Like the user in the jira issue I am using mesos, but I am reading
>> from oracle instead of writing to Cassandra and S3.
>>
>>
>> On Thu, Apr 16, 2020 at 1:54 AM ZHANG Wei 
>> wrote:
>>
>>> The Thread dump result table of Spark UI can provide some clues to
>>> find out thread locks issue, such as:
>>>
>>>   Thread ID | Thread Name  | Thread State | Thread
>>> Locks
>>>   13| NonBlockingInputStreamThread | WAITING  | Blocked
>>> by Thread Some(48) Lock(jline.internal.NonBlockingInputStream@103008951
>>> })
>>>   48| Thread-16| RUNNABLE |
>>> Monitor(jline.internal.NonBlockingInputStream@103008951})
>>>
>>> And echo thread row can show the call stacks after being clicked,
>>> then you can check the root cause of holding locks like this(Thread 48 
>>> of
>>> above):
>>>
>>>   org.fusesource.jansi.internal.Kernel32.ReadConsoleInputW(Native
>>> Method)
>>>
>>> org.fusesource.jansi.internal.Kernel32.readConsoleInputHelper(Kernel32.java:811)
>>>
>>> org.fusesource.jansi.internal.Kernel32.readConsoleKeyInput(Kernel32.java:842)
>>>
>>> org.fusesource.jansi.internal.WindowsSupport.readConsoleInput(WindowsSupport.java:97)
>>>   jline.WindowsTerminal.readConsoleInput(WindowsTerminal.java:222)
>>>   
>>>
>>> Hope it can help you.
>>>
>>> --
>>> Cheers,
>>> -z
>>>
>>> On Thu, 16 Apr 2020 16:36:42 +0900
>>> Jungtaek Lim  wrote:
>>>
>>> > Do thread dump continuously, per specific period (like 1s) and see
>>> the
>>> > change of stack / lock for each thread. (This is not easy to be
>>> done in UI
>>> > so maybe doing manually would be the only option. Not sure Spark
>>> UI will
>>> > 

Re: Good idea to do multi-threading in spark job?

2020-05-06 Thread Ruijing Li
Thanks for the answer Sean!

On Sun, May 3, 2020 at 10:35 AM Sean Owen  wrote:

> Spark will by default assume each task needs 1 CPU. On an executor
> with 16 cores and 16 slots, you'd schedule 16 tasks. If each is using
> 4 cores, then 64 threads are trying to run. If you're CPU-bound, that
> could slow things down. But to the extent some of tasks take some time
> blocking on I/O, it could increase overall utilization. You shouldn't
> have to worry about Spark there, but, you do have to consider that N
> tasks, each with its own concurrency, maybe executing your code in one
> JVM, and whatever synchronization that implies.
>
> On Sun, May 3, 2020 at 11:32 AM Ruijing Li  wrote:
> >
> > Hi all,
> >
> > We have a spark job (spark 2.4.4, hadoop 2.7, scala 2.11.12) where we
> use semaphores / parallel collections within our spark job. We definitely
> notice a huge speedup in our job from doing this, but were wondering if
> this could cause any unintended side effects? Particularly I’m worried
> about any deadlocks and if it could mess with the fixes for issues such as
> this
> > https://issues.apache.org/jira/plugins/servlet/mobile#issue/SPARK-26961
> >
> > We do run with multiple cores.
> >
> > Thanks!
> > --
> > Cheers,
> > Ruijing Li
>
-- 
Cheers,
Ruijing Li