RE: Regarding spark-3.2.0 decommission features.

2022-01-26 Thread Rao, Abhishek (Nokia - IN/Bangalore)
Hi Dongjoon Hyun,

Any inputs on the below issue would be helpful. Please let us know if we're 
missing anything?

Thanks and Regards,
Abhishek

From: Patidar, Mohanlal (Nokia - IN/Bangalore) 
Sent: Thursday, January 20, 2022 11:58 AM
To: user@spark.apache.org
Subject: Suspected SPAM - RE: Regarding spark-3.2.0 decommission features.

Gentle reminder!!!

Br,
-Mohan Patidar



From: Patidar, Mohanlal (Nokia - IN/Bangalore)
Sent: Tuesday, January 18, 2022 2:02 PM
To: user@spark.apache.org
Cc: Rao, Abhishek (Nokia - IN/Bangalore) 
mailto:abhishek@nokia.com>>; Gowda Tp, Thimme 
(Nokia - IN/Bangalore) 
mailto:thimme.gowda...@nokia.com>>; Sharma, Prakash 
(Nokia - IN/Bangalore) 
mailto:prakash.sha...@nokia.com>>; Tarun, N (Nokia - 
IN/Bangalore) mailto:n.ta...@nokia.com>>; Badagandi, 
Srinivas B. (Nokia - IN/Bangalore) 
mailto:srinivas.b.badaga...@nokia.com>>
Subject: Regarding spark-3.2.0 decommission features.

Hi,
 We're using Spark 3.2.0 and we have enabled the spark decommission 
feature. As part of validating this feature, we wanted to check if the rdd 
blocks and shuffle blocks from the decommissioned executors are migrated to 
other executors.
However, we could not see this happening. Below is the configuration we used.

  1.  Spark Configuration used:
 spark.local.dir /mnt/spark-ldir
 spark.decommission.enabled true
 spark.storage.decommission.enabled true
 spark.storage.decommission.rddBlocks.enabled true
 spark.storage.decommission.shuffleBlocks.enabled true
 spark.dynamicAllocation.enabled true
  2.  Brought up spark-driver and executors on the different nodes.
NAME
  READY  STATUS   NODE
decommission-driver 
1/1 Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-2  1/1 
Running   Node2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-3  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-4  1/1 
Running   Node2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-5  1/1 
Running   Node1
  3.  Bringdown Node2 so status of pods as are following.

NAME
  READY  STATUS   NODE
decommission-driver 
1/1 Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-1  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-2  1/1 
TerminatingNode2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-3  1/1 
Running   Node1
gzip-compression-test-ae0b0b7e4d7fbe40-exec-4  1/1 
TerminatingNode2
gzip-compression-test-ae0b0b7e4d7fbe40-exec-5  1/1 
Running   Node1
  4.  Driver logs:
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.296Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.459Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.564Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.601Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:55:28.667Z", 
"timezone":"UTC", "log":"Adding decommission script to lifecycle"}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.885Z", 
"timezone":"UTC", "log":"Notify executor 5 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Notify executor 1 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Notify executor 3 to decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:58:21.887Z", 
"timezone":"UTC", "log":"Mark BlockManagers (BlockManagerId(5, X.X.X.X, 33359, 
None), BlockManagerId(1, X.X.X.X, 38655, None), BlockManagerId(3, X.X.X.X, 
35797, None)) as being decommissioning."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:59:24.426Z", 
"timezone":"UTC", "log":"Executor 2 is removed. Remove reason statistics: 
(gracefully decommissioned: 0, decommision unfinished: 0, driver killed: 0, 
unexpectedly exited: 1)."}
{"type":"log", "level":"INFO", "time":"2022-01-12T08:59:24.426Z", 
"timezone":"UTC", "log":"Executor 4 is removed. Remove reason statistics: 
(gracefully decommissioned: 0, 

Re: question for definition of column types

2022-01-26 Thread Sean Owen
You can cast the cols as well. But are the columns strings to begin with?
they could also actually be doubles.

On Wed, Jan 26, 2022 at 8:49 PM  wrote:

> when creating dataframe from a list, how can I specify the col type?
>
> such as:
>
> >>> df =
> >>>
> spark.createDataFrame(list,["name","title","salary","rate","insurance"])
> >>> df.show()
> +---+-+--++-+
> |   name|title|salary|rate|insurance|
> +---+-+--++-+
> |buck trends|  ceo|20|0.25|  100|
> |cindy banks|  cfo|17|0.22|  120|
> |  joe coder|developer|13| 0.2|  120|
> +---+-+--++-+
>
>
> >>> df.describe()
> DataFrame[summary: string, name: string, title: string, salary: string,
> rate: string, insurance: string]
>
> I want the salary, rate, insurance to be Double type, not a String type.
>
> Thank you.
> Frakass
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: question for definition of column types

2022-01-26 Thread Peyman Mohajerian
from pyspark.sql.types import *

list =[("buck trends", "ceo", 20.00, 0.25, "100")]

schema = StructType([ StructField("name", StringType(), True),
  StructField("title", StringType(), True),
  StructField("salary", DoubleType(), True),
  StructField("rate", DoubleType(), True),
  StructField("insurance", StringType(), True)
])

df= spark.createDataFrame(data=list, schema=schema)

On Wed, Jan 26, 2022 at 6:49 PM  wrote:

> when creating dataframe from a list, how can I specify the col type?
>
> such as:
>
> >>> df =
> >>>
> spark.createDataFrame(list,["name","title","salary","rate","insurance"])
> >>> df.show()
> +---+-+--++-+
> |   name|title|salary|rate|insurance|
> +---+-+--++-+
> |buck trends|  ceo|20|0.25|  100|
> |cindy banks|  cfo|17|0.22|  120|
> |  joe coder|developer|13| 0.2|  120|
> +---+-+--++-+
>
>
> >>> df.describe()
> DataFrame[summary: string, name: string, title: string, salary: string,
> rate: string, insurance: string]
>
> I want the salary, rate, insurance to be Double type, not a String type.
>
> Thank you.
> Frakass
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


question for definition of column types

2022-01-26 Thread capitnfrakass

when creating dataframe from a list, how can I specify the col type?

such as:

df = 
spark.createDataFrame(list,["name","title","salary","rate","insurance"])

df.show()

+---+-+--++-+
|   name|title|salary|rate|insurance|
+---+-+--++-+
|buck trends|  ceo|20|0.25|  100|
|cindy banks|  cfo|17|0.22|  120|
|  joe coder|developer|13| 0.2|  120|
+---+-+--++-+



df.describe()
DataFrame[summary: string, name: string, title: string, salary: string, 
rate: string, insurance: string]


I want the salary, rate, insurance to be Double type, not a String type.

Thank you.
Frakass

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Migration to Spark 3.2

2022-01-26 Thread Stephen Coy
Hi Aurélien!

Please run

mvn dependency:tree

and check it for Jackson dependencies.

Feel free to respond with the output if you have any questions about it.

Cheers,

Steve C

> On 22 Jan 2022, at 10:49 am, Aurélien Mazoyer  wrote:
>
> Hello,
>
> I migrated my code to Spark 3.2 and I am facing some issues. When I run my 
> unit tests via Maven, I get this error:
> java.lang.NoClassDefFoundError: Could not initialize class 
> org.apache.spark.rdd.RDDOperationScope$
> which is not super nice.
>
> However, when I run my test via Intellij, I get the following one:
> java.lang.ExceptionInInitializerError
> at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
> at org.apache.spark.rdd.RDD.map(RDD.scala:421)
> ...
> Caused by: com.fasterxml.jackson.databind.JsonMappingException: Scala module 
> 2.12.3 requires Jackson Databind version >= 2.12.0 and < 2.13.0
> which is far better imo since it gives me some clue on what is missing in my 
> pom.xml file to make it work. After putting a few more dependencies, my tests 
> are again passing in intellij, but I am stuck on the same error when I am 
> running maven command :-/.
> It seems that jdk and maven versions are the same and both are using the same 
> .m2 directory.
> Any clue on what can be going wrong?
>
> Thank you,
>
> Aurelien

This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



unsubscribe

2022-01-26 Thread Lucas Schroeder Rossi
unsubscribe

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

2022-01-26 Thread Sean Owen
Really depends on what your UDF is doing. You could read 2GB of XML into
much more than that as a DOM representation in memory.
Remember 15GB of executor memory is shared across tasks.
You need to get a handle on what memory your code is using to begin with to
start to reason about whether that's enough, first.

On Wed, Jan 26, 2022 at 10:03 AM Abhimanyu Kumar Singh <
abhimanyu.kr.sing...@gmail.com> wrote:

> Thanks for your quick response.
>
> For some reasons I can't use spark-xml (schema related issue).
>
> I've tried reducing number of tasks per executor by increasing the number
> of executors, but it still throws same error.
>
> I can't understand why does even 15gb of executor memory is not sufficient
> to parse just 2gb XML file.
> How can I check the max amount of JVM memory utilised for each task?
>
> Do I need to tweak some other configurations for increasing JVM memory
> rather than spark.executor.memory?
>
> On Wed, Jan 26, 2022, 9:23 PM Sean Owen  wrote:
>
>> Executor memory used shows data that is cached, not the VM usage. You're
>> running out of memory somewhere, likely in your UDF, which probably parses
>> massive XML docs as a DOM first or something. Use more memory, fewer tasks
>> per executor, or consider using spark-xml if you are really just parsing
>> pieces of it. It'll be more efficient.
>>
>> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
>> abhimanyu.kr.sing...@gmail.com> wrote:
>>
>>> I'm doing some complex operations inside spark UDF (parsing huge XML).
>>>
>>> Dataframe:
>>> | value |
>>> | Content of XML File 1 |
>>> | Content of XML File 2 |
>>> | Content of XML File N |
>>>
>>> val df = Dataframe.select(UDF_to_parse_xml(value))
>>>
>>> UDF looks something like:
>>>
>>> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
>>> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>>>
>>> Parsing requires creation and de-duplication of arrays from the XML
>>> containing
>>> around 0.1 million elements (consisting of MyClass(Strings, Maps,
>>> Integers,  )).
>>>
>>> In the Spark UI "executor memory used" is barely 60-70 MB. But still
>>> Spark processing fails
>>> with *ExecutorLostFailure *error for XMLs of size around 2GB.
>>> When I increase the executor size (say 15GB to 25 GB) it works fine. One
>>> partition can contain only
>>> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>>>
>>> *My question is which memory is being used by UDF for storing arrays,
>>> maps or sets while parsing?*
>>> *And how can I configure it?*
>>>
>>> Should I increase *spark*.*memory*.*offHeap*.size,
>>> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>>>
>>> Thanks a lot,
>>> Abhimanyu
>>>
>>> PS: I know I shouldn't use UDF this way, but I don't have any other
>>> alternative here.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>


Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

2022-01-26 Thread Abhimanyu Kumar Singh
Thanks for your quick response.

For some reasons I can't use spark-xml (schema related issue).

I've tried reducing number of tasks per executor by increasing the number
of executors, but it still throws same error.

I can't understand why does even 15gb of executor memory is not sufficient
to parse just 2gb XML file.
How can I check the max amount of JVM memory utilised for each task?

Do I need to tweak some other configurations for increasing JVM memory
rather than spark.executor.memory?

On Wed, Jan 26, 2022, 9:23 PM Sean Owen  wrote:

> Executor memory used shows data that is cached, not the VM usage. You're
> running out of memory somewhere, likely in your UDF, which probably parses
> massive XML docs as a DOM first or something. Use more memory, fewer tasks
> per executor, or consider using spark-xml if you are really just parsing
> pieces of it. It'll be more efficient.
>
> On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
> abhimanyu.kr.sing...@gmail.com> wrote:
>
>> I'm doing some complex operations inside spark UDF (parsing huge XML).
>>
>> Dataframe:
>> | value |
>> | Content of XML File 1 |
>> | Content of XML File 2 |
>> | Content of XML File N |
>>
>> val df = Dataframe.select(UDF_to_parse_xml(value))
>>
>> UDF looks something like:
>>
>> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
>> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>>
>> Parsing requires creation and de-duplication of arrays from the XML
>> containing
>> around 0.1 million elements (consisting of MyClass(Strings, Maps,
>> Integers,  )).
>>
>> In the Spark UI "executor memory used" is barely 60-70 MB. But still
>> Spark processing fails
>> with *ExecutorLostFailure *error for XMLs of size around 2GB.
>> When I increase the executor size (say 15GB to 25 GB) it works fine. One
>> partition can contain only
>> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>>
>> *My question is which memory is being used by UDF for storing arrays,
>> maps or sets while parsing?*
>> *And how can I configure it?*
>>
>> Should I increase *spark*.*memory*.*offHeap*.size,
>> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>>
>> Thanks a lot,
>> Abhimanyu
>>
>> PS: I know I shouldn't use UDF this way, but I don't have any other
>> alternative here.
>>
>>
>>
>>
>>
>>
>>
>>


Re: [Spark UDF]: Where does UDF stores temporary Arrays/Sets

2022-01-26 Thread Sean Owen
Executor memory used shows data that is cached, not the VM usage. You're
running out of memory somewhere, likely in your UDF, which probably parses
massive XML docs as a DOM first or something. Use more memory, fewer tasks
per executor, or consider using spark-xml if you are really just parsing
pieces of it. It'll be more efficient.

On Wed, Jan 26, 2022 at 9:47 AM Abhimanyu Kumar Singh <
abhimanyu.kr.sing...@gmail.com> wrote:

> I'm doing some complex operations inside spark UDF (parsing huge XML).
>
> Dataframe:
> | value |
> | Content of XML File 1 |
> | Content of XML File 2 |
> | Content of XML File N |
>
> val df = Dataframe.select(UDF_to_parse_xml(value))
>
> UDF looks something like:
>
> val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
> val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct
>
> Parsing requires creation and de-duplication of arrays from the XML
> containing
> around 0.1 million elements (consisting of MyClass(Strings, Maps,
> Integers,  )).
>
> In the Spark UI "executor memory used" is barely 60-70 MB. But still Spark
> processing fails
> with *ExecutorLostFailure *error for XMLs of size around 2GB.
> When I increase the executor size (say 15GB to 25 GB) it works fine. One
> partition can contain only
> one XML file (with max size 2GB) and 1 task/executor runs in parallel.
>
> *My question is which memory is being used by UDF for storing arrays, maps
> or sets while parsing?*
> *And how can I configure it?*
>
> Should I increase *spark*.*memory*.*offHeap*.size,
> spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?
>
> Thanks a lot,
> Abhimanyu
>
> PS: I know I shouldn't use UDF this way, but I don't have any other
> alternative here.
>
>
>
>
>
>
>
>


[Spark UDF]: Where does UDF stores temporary Arrays/Sets

2022-01-26 Thread Abhimanyu Kumar Singh
I'm doing some complex operations inside spark UDF (parsing huge XML).

Dataframe:
| value |
| Content of XML File 1 |
| Content of XML File 2 |
| Content of XML File N |

val df = Dataframe.select(UDF_to_parse_xml(value))

UDF looks something like:

val XMLelements : Array[MyClass1] = getXMLelements(xmlContent)
val myResult: Array[MyClass2] = XMLelements.map(myfunction).distinct

Parsing requires creation and de-duplication of arrays from the XML
containing
around 0.1 million elements (consisting of MyClass(Strings, Maps, Integers,
 )).

In the Spark UI "executor memory used" is barely 60-70 MB. But still Spark
processing fails
with *ExecutorLostFailure *error for XMLs of size around 2GB.
When I increase the executor size (say 15GB to 25 GB) it works fine. One
partition can contain only
one XML file (with max size 2GB) and 1 task/executor runs in parallel.

*My question is which memory is being used by UDF for storing arrays, maps
or sets while parsing?*
*And how can I configure it?*

Should I increase *spark*.*memory*.*offHeap*.size,
spark.yarn.executor.memoryOverhead or spark.executor.memoryOverhead?

Thanks a lot,
Abhimanyu

PS: I know I shouldn't use UDF this way, but I don't have any other
alternative here.