Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-04-01 Thread Mich Talebzadeh
Good stuff Khalid.

I have created a section in Apache Spark Community Stack called spark
foundation.  spark-foundation - Apache Spark Community - Slack


I invite you to add your weblink to that section.

HTH
Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Sat, 1 Apr 2023 at 13:12, Khalid Mammadov 
wrote:

> Hey AN-TRUONG
>
> I have got some articles about this subject that should help.
> E.g.
> https://khalidmammadov.github.io/spark/spark_internals_rdd.html
>
> Also check other Spark Internals on web.
>
> Regards
> Khalid
>
> On Fri, 31 Mar 2023, 16:29 AN-TRUONG Tran Phan, 
> wrote:
>
>> Thank you for your information,
>>
>> I have tracked the spark history server on port 18080 and the spark UI on
>> port 4040. I see the result of these two tools as similar right?
>>
>> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
>> the images does, is it possible?
>> https://i.stack.imgur.com/Azva4.png
>>
>> Best regards,
>>
>> An - Truong
>>
>>
>> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh <
>> mich.talebza...@gmail.com> wrote:
>>
>>> Are you familiar with spark GUI default on port 4040?
>>>
>>> have a look.
>>>
>>> HTH
>>>
>>> Mich Talebzadeh,
>>> Lead Solutions Architect/Engineering Lead
>>> Palantir Technologies Limited
>>>
>>>
>>>view my Linkedin profile
>>> 
>>>
>>>
>>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>>> tr.phan.tru...@gmail.com> wrote:
>>>
 Hi,

 I am learning about Apache Spark and want to know the meaning of each
 Task created on the Jobs recorded on Spark history.

 For example, the application I write creates 17 jobs, in which job 0
 runs for 10 minutes, there are 2384 small tasks and I want to learn about
 the meaning of these 2384, is it possible?

 I found a picture of DAG in the Jobs and want to know the relationship
 between DAG and Task, is it possible (Specifically from the attached file
 DAG and 2384 tasks below)?

 Thank you very much, have a nice day everyone.

 Best regards,

 An-Trường.

 -
 To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>
>> --
>> Trân Trọng,
>>
>> An Trường.
>>
>


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-04-01 Thread Khalid Mammadov
Hey AN-TRUONG

I have got some articles about this subject that should help.
E.g.
https://khalidmammadov.github.io/spark/spark_internals_rdd.html

Also check other Spark Internals on web.

Regards
Khalid

On Fri, 31 Mar 2023, 16:29 AN-TRUONG Tran Phan, 
wrote:

> Thank you for your information,
>
> I have tracked the spark history server on port 18080 and the spark UI on
> port 4040. I see the result of these two tools as similar right?
>
> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
> the images does, is it possible?
> https://i.stack.imgur.com/Azva4.png
>
> Best regards,
>
> An - Truong
>
>
> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
> wrote:
>
>> Are you familiar with spark GUI default on port 4040?
>>
>> have a look.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>> tr.phan.tru...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am learning about Apache Spark and want to know the meaning of each
>>> Task created on the Jobs recorded on Spark history.
>>>
>>> For example, the application I write creates 17 jobs, in which job 0
>>> runs for 10 minutes, there are 2384 small tasks and I want to learn about
>>> the meaning of these 2384, is it possible?
>>>
>>> I found a picture of DAG in the Jobs and want to know the relationship
>>> between DAG and Task, is it possible (Specifically from the attached file
>>> DAG and 2384 tasks below)?
>>>
>>> Thank you very much, have a nice day everyone.
>>>
>>> Best regards,
>>>
>>> An-Trường.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Trân Trọng,
>
> An Trường.
>


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-03-31 Thread Mich Talebzadeh
yes history refers to completed jobs. 4040 is the running jobs

you should have screen shots for executors and stages as well.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 31 Mar 2023 at 16:17, AN-TRUONG Tran Phan 
wrote:

> Thank you for your information,
>
> I have tracked the spark history server on port 18080 and the spark UI on
> port 4040. I see the result of these two tools as similar right?
>
> I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
> the images does, is it possible?
> https://i.stack.imgur.com/Azva4.png
>
> Best regards,
>
> An - Truong
>
>
> On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
> wrote:
>
>> Are you familiar with spark GUI default on port 4040?
>>
>> have a look.
>>
>> HTH
>>
>> Mich Talebzadeh,
>> Lead Solutions Architect/Engineering Lead
>> Palantir Technologies Limited
>>
>>
>>view my Linkedin profile
>> 
>>
>>
>>  https://en.everybodywiki.com/Mich_Talebzadeh
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
>> tr.phan.tru...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am learning about Apache Spark and want to know the meaning of each
>>> Task created on the Jobs recorded on Spark history.
>>>
>>> For example, the application I write creates 17 jobs, in which job 0
>>> runs for 10 minutes, there are 2384 small tasks and I want to learn about
>>> the meaning of these 2384, is it possible?
>>>
>>> I found a picture of DAG in the Jobs and want to know the relationship
>>> between DAG and Task, is it possible (Specifically from the attached file
>>> DAG and 2384 tasks below)?
>>>
>>> Thank you very much, have a nice day everyone.
>>>
>>> Best regards,
>>>
>>> An-Trường.
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>
> --
> Trân Trọng,
>
> An Trường.
>


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-03-31 Thread AN-TRUONG Tran Phan
Thank you for your information,

I have tracked the spark history server on port 18080 and the spark UI on
port 4040. I see the result of these two tools as similar right?

I want to know what each Task ID (Example Task ID 0, 1, 3, 4, 5, ) in
the images does, is it possible?
https://i.stack.imgur.com/Azva4.png

Best regards,

An - Truong


On Fri, Mar 31, 2023 at 9:38 PM Mich Talebzadeh 
wrote:

> Are you familiar with spark GUI default on port 4040?
>
> have a look.
>
> HTH
>
> Mich Talebzadeh,
> Lead Solutions Architect/Engineering Lead
> Palantir Technologies Limited
>
>
>view my Linkedin profile
> 
>
>
>  https://en.everybodywiki.com/Mich_Talebzadeh
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan <
> tr.phan.tru...@gmail.com> wrote:
>
>> Hi,
>>
>> I am learning about Apache Spark and want to know the meaning of each
>> Task created on the Jobs recorded on Spark history.
>>
>> For example, the application I write creates 17 jobs, in which job 0 runs
>> for 10 minutes, there are 2384 small tasks and I want to learn about the
>> meaning of these 2384, is it possible?
>>
>> I found a picture of DAG in the Jobs and want to know the relationship
>> between DAG and Task, is it possible (Specifically from the attached file
>> DAG and 2384 tasks below)?
>>
>> Thank you very much, have a nice day everyone.
>>
>> Best regards,
>>
>> An-Trường.
>>
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Trân Trọng,

An Trường.


Re: Help me learn about JOB TASK and DAG in Apache Spark

2023-03-31 Thread Mich Talebzadeh
Are you familiar with spark GUI default on port 4040?

have a look.

HTH

Mich Talebzadeh,
Lead Solutions Architect/Engineering Lead
Palantir Technologies Limited


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 31 Mar 2023 at 15:15, AN-TRUONG Tran Phan 
wrote:

> Hi,
>
> I am learning about Apache Spark and want to know the meaning of each Task
> created on the Jobs recorded on Spark history.
>
> For example, the application I write creates 17 jobs, in which job 0 runs
> for 10 minutes, there are 2384 small tasks and I want to learn about the
> meaning of these 2384, is it possible?
>
> I found a picture of DAG in the Jobs and want to know the relationship
> between DAG and Task, is it possible (Specifically from the attached file
> DAG and 2384 tasks below)?
>
> Thank you very much, have a nice day everyone.
>
> Best regards,
>
> An-Trường.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Help needed regarding error with 5 node Spark cluster (shuffle error)- Comcast

2023-01-30 Thread Artemis User
Not sure where you get the property name "spark.memory.offHeap.use". The 
correct one should be "spark.memory.offHeap.enabled".  See 
https://spark.apache.org/docs/latest/configuration.html#spark-properties 
for details.


On 1/30/23 10:12 AM, Jain, Sanchi wrote:


I am not sure if this is the intended DL for reaching out for help. 
Please redirect to the right DL


*From: *Jain, Sanchi 
*Date: *Monday, January 30, 2023 at 10:10 AM
*To: *priv...@spark.apache.org 
*Subject: *Request for access to create a jira account- Comcast

Hello there

I am a principal engineer at Comcast and my team is currently working 
on building a standalone Spark cluster on a 5 node Linux cluster 
environment. We are running into roadblocks due to the following error 
observed when a Spark streaming application is submitted to a remote 
master.


org.apache.spark.shuffle.MetadataFetchFailedException: Missing an 
output location for shuffle 0 partition 11


at 
org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1705)


at 
org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1652)


at 
org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10$adapted(MapOutputTracker.scala:1651)


at scala.collection.Iterator.foreach(Iterator.scala:943)

at scala.collection.Iterator.foreach$(Iterator.scala:943)

Here are the other details of the environment configuration –

Software version - spark-3.3.1-bin-hadoop3

Scala version – scala_2.12.15

Total memory assigned to the worker nodes – 14.5 GB (2 GB used)

CPU/Memory assigned to each node – 4 cores/16 GB

Driver memory – 4 G

Executor memory – 3G

Spark-submit command used –

/tmp/spark-3.3.1-bin-hadoop3/bin/spark-submit --master 
"spark://:7077" --conf spark.submit.deployMode=client 
--conf spark.executor.instances=4 --conf spark.executor.memory=3g 
--conf spark.driver.memory=4g --conf spark.memory.offHeap.use=true 
--conf spark.memory.offHeap.size=3g --conf 
spark.sql.broadcastTimeout=300s --conf 
spark.sql.autoBroadcastThreshold=1g  --class  
./.jar


We will really appreciate if we can be assigned a jira account for 
submitting an issue in this regard or if we can reach out to the ASF 
community for help.


Thanks

Sanchita Jain

sanchita_j...@comcast.com



Re: Help needed regarding error with 5 node Spark cluster (shuffle error)- Comcast

2023-01-30 Thread Mich Talebzadeh
Hi,

Identify the cause of the shuffle. Also how are you using HDFS here?

https://community.cloudera.com/t5/Support-Questions/Spark-Metadata-Fetch-Failed-Exception-Missing-an-output/td-p/203771

HTH


   view my Linkedin profile



 https://en.everybodywiki.com/Mich_Talebzadeh



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 30 Jan 2023 at 15:15, Jain, Sanchi 
wrote:

> I am not sure if this is the intended DL for reaching out for help. Please
> redirect to the right DL
>
>
>
> *From: *Jain, Sanchi 
> *Date: *Monday, January 30, 2023 at 10:10 AM
> *To: *priv...@spark.apache.org 
> *Subject: *Request for access to create a jira account- Comcast
>
> Hello there
>
>
>
> I am a principal engineer at Comcast and my team is currently working on
> building a standalone Spark cluster on a 5 node Linux cluster environment.
> We are running into roadblocks due to the following error observed when a
> Spark streaming application is submitted to a remote master.
>
>
>
> org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 0 partition 11
>
> at
> org.apache.spark.MapOutputTracker$.validateStatus(MapOutputTracker.scala:1705)
>
> at
> org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10(MapOutputTracker.scala:1652)
>
> at
> org.apache.spark.MapOutputTracker$.$anonfun$convertMapStatuses$10$adapted(MapOutputTracker.scala:1651)
>
> at scala.collection.Iterator.foreach(Iterator.scala:943)
>
> at scala.collection.Iterator.foreach$(Iterator.scala:943)
>
>
>
> Here are the other details of the environment configuration –
>
> Software version - spark-3.3.1-bin-hadoop3
>
> Scala version – scala_2.12.15
>
> Total memory assigned to the worker nodes – 14.5 GB (2 GB used)
>
> CPU/Memory assigned to each node – 4 cores/16 GB
>
> Driver memory – 4 G
>
> Executor memory – 3G
>
>
>
> Spark-submit command used –
>
>
>
> /tmp/spark-3.3.1-bin-hadoop3/bin/spark-submit --master
> "spark://:7077" --conf spark.submit.deployMode=client --conf
> spark.executor.instances=4 --conf spark.executor.memory=3g --conf
> spark.driver.memory=4g --conf spark.memory.offHeap.use=true --conf
> spark.memory.offHeap.size=3g --conf spark.sql.broadcastTimeout=300s --conf
> spark.sql.autoBroadcastThreshold=1g  --class 
> ./.jar
>
>
>
> We will really appreciate if we can be assigned a jira account for
> submitting an issue in this regard or if we can reach out to the ASF
> community for help.
>
>
>
> Thanks
>
> Sanchita Jain
>
> sanchita_j...@comcast.com
>
>
>
>
>


Re: Help with Shuffle Read performance

2022-09-30 Thread Igor Calabria
Thanks a lot for the answers foks.

It turned out that spark was just IOPs starved. Using better disks solved
my issue, so nothing related to kubernetes at all.

Have a nice weekend everyone

On Fri, Sep 30, 2022 at 4:27 PM Artemis User  wrote:

> The reduce phase is always more resource-intensive than the map phase.
> Couple of suggestions you may want to consider:
>
>1. Setting the number of partitions to 18K may be way too high (the
>default number is only 200).  You may want to just use the default and the
>scheduler will automatically increase the partitions if needed.
>2. Turn on dynamic resource allocation (DRA) (
>
> https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation).
>It would allow those executors that finish the map tasks returning the
>resources (e.g. RAM, CPU cores) back to the cluster, and reallocate the
>resources to the reduce tasks.  This feature (post Spark 3.0) is also
>available to K8, but turned off by default.
>3. With DRA turned on, you may want also try to play with a small
>number of number of executors/nodes thus reducing shuffling needs, given
>the fact that you only have 128GB RAM.
>
> Hope this helps...
>
> On 9/29/22 2:12 PM, Igor Calabria wrote:
>
> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>
>


Re: Help with Shuffle Read performance

2022-09-30 Thread Artemis User
The reduce phase is always more resource-intensive than the map phase.  
Couple of suggestions you may want to consider:


1. Setting the number of partitions to 18K may be way too high (the
   default number is only 200).  You may want to just use the default
   and the scheduler will automatically increase the partitions if needed.
2. Turn on dynamic resource allocation (DRA)
   
(https://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation).
   It would allow those executors that finish the map tasks returning
   the resources (e.g. RAM, CPU cores) back to the cluster, and
   reallocate the resources to the reduce tasks.  This feature (post
   Spark 3.0) is also available to K8, but turned off by default.
3. With DRA turned on, you may want also try to play with a small
   number of number of executors/nodes thus reducing shuffling needs,
   given the fact that you only have 128GB RAM.

Hope this helps...


On 9/29/22 2:12 PM, Igor Calabria wrote:

Hi Everyone,

I'm running spark 3.2 on kubernetes and have a job with a decently 
sized shuffle of almost 4TB. The relevant cluster config is as follows:


- 30 Executors. 16 physical cores, configured with 32 Cores for spark
- 128 GB RAM
-  shuffle.partitions is 18k which gives me tasks of around 150~180MB

The job runs fine but I'm bothered by how underutilized the cluster 
gets during the reduce phase. During the map(reading data from s3 and 
writing the shuffle data) CPU usage, disk throughput and network usage 
is as expected, but during the reduce phase it gets really low. It 
seems the main bottleneck is reading shuffle data from other nodes, 
task statistics reports values ranging from 25s to several minutes(the 
task sizes are really close, they aren't skewed). I've tried 
increasing "spark.reducer.maxSizeInFlight" and 
"spark.shuffle.io.numConnectionsPerPeer" and it did improve 
performance by a little, but not enough to saturate the cluster resources.


Did I miss some more tuning parameters that could help?
One obvious thing would be to vertically increase the machines and use 
less nodes to minimize traffic, but 30 nodes doesn't seem like much 
even considering 30x30 connections.


Thanks in advance!



Re: Help with Shuffle Read performance

2022-09-30 Thread Leszek Reimus
Hi Sungwoo,

I tend to agree - for a new system, I would probably not go that route, as
Spark on Kubernetes is getting there and can do a lot already. Issue I
mentioned before can be fixed with proper node fencing - it is a typical
stateful set problem Kubernetes has without fencing - node goes down, k8s
refuses to relocate stateful set pod because it did not get confirmation
that old pod was shut down or not, since lack of communication != shut down
- it can be split brain too. Since it operates on "at most one" principle -
it will not allow 2nd pod up.

But keeping the 2nd layer of resource abstraction has its pluses as well.
>From k8s perspective you allocate memory/cpu as a block to the entire
YARN/Spark, making the allocation easier. In the end - ability to support
Spark 1.6, MR, Spark 2.4 and Spark 3.1 jobs in one system is what made it
for us.

It also makes it easier when you want full data locality with shortcut HDFS
reads. Since as far as I know Spark on Kubernetes stopped talking about
that - and in my experience data locality can speed up Spark by large
factor.

Sincerely,

Leszek Reimus




On Fri, Sep 30, 2022 at 2:25 AM Sungwoo Park  wrote:

> Hi Leszek,
>
> For running YARN on Kubernetes and then running Spark on YARN, is there a
> lot of overhead for maintaining YARN on Kubernetes? I thought people
> usually want to move from YARN to Kubernetes because of the overhead of
> maintaining Hadoop.
>
> Thanks,
>
> --- Sungwoo
>
>
> On Fri, Sep 30, 2022 at 1:37 PM Leszek Reimus 
> wrote:
>
>> Hi Everyone,
>>
>> To add my 2 cents here:
>>
>> Advantage of containers, to me, is that it leaves the host system
>> pristine and clean, allowing standardized devops deployment of hardware for
>> any purpose. Way back before - when using bare metal / ansible, reusing hw
>> always involved full reformat of base system. This alone is worth the ~1-2%
>> performance tax cgroup containers have.
>>
>> Advantage of kubernetes is more on the deployment side of things. Unified
>> deployment scripts that can be written by devs. Same deployment yaml (or
>> helm chart) can be used on local Dev Env / QA / Integration Env and finally
>> Prod (with some tweaks).
>>
>> Depending on the networking CNI, and storage backend - Kubernetes can
>> have a very close to bare metal performance. In the end it is always a
>> trade-off. You gain some, you pay with extra overhead.
>>
>> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
>> legacy MapReduce jobs too though) . Finding it much more manageable to
>> allocate larger memory/cpu chunks to yarn pods and then have run
>> auto-scaler to scale out YARN if needed; than to manage individual
>> memory/cpu requirements on Spark on Kubernetes deployment.
>>
>> As far as I tested, Spark on Kubernetes is immature when reliability is
>> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
>> yet). When a node dies / goes down, I find executors not getting
>> rescheduled to other nodes - the driver just gets stuck for the executors
>> to come back. This does not happen on YARN / Standalone deployment (even
>> when ran on same k8s cluster)
>>
>> Sincerely,
>>
>> Leszek Reimus
>>
>>
>>
>>
>> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> dont containers finally run on systems, and the only advantage of
>>> containers is that you can do better utilisation of system resources by
>>> micro management of jobs running in it? Some say that containers have their
>>> own binaries which isolates environment, but that is a lie, because in a
>>> kubernetes environments that is running your SPARK jobs you will have the
>>> same environment for all your kubes.
>>>
>>> And as you can see there are several other configurations, disk
>>> mounting, security, etc issues to handle as an overhead as well.
>>>
>>> And the entire goal of all those added configurations is that someone in
>>> your devops team feels using containers makes things more interesting
>>> without any real added advantage to large volume jobs.
>>>
>>> But I may be wrong, and perhaps we need data, and not personal attacks
>>> like the other person in the thread did.
>>>
>>> In case anyone does not know EMR does run on containers as well, and in
>>> EMR running on EC2 nodes you can put all your binaries in containers and
>>> use those for running your jobs.
>>>
>>> Regards,
>>> Gourav Sengupta
>>>
>>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
>>> wrote:
>>>
 Igor,

 what exact instance types do you use? Unless you use local instance
 storage and have actually configured your Kubernetes and Spark to use
 instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
 investigate that by going to an instance, then to volume, and see
 monitoring charts.

 Another thought is that you're essentially giving 4GB per core. That
 sounds pretty low, in my experience.



 On Thu, 

Re: Help with Shuffle Read performance

2022-09-30 Thread Sungwoo Park
Hi Leszek,

For running YARN on Kubernetes and then running Spark on YARN, is there a
lot of overhead for maintaining YARN on Kubernetes? I thought people
usually want to move from YARN to Kubernetes because of the overhead of
maintaining Hadoop.

Thanks,

--- Sungwoo


On Fri, Sep 30, 2022 at 1:37 PM Leszek Reimus 
wrote:

> Hi Everyone,
>
> To add my 2 cents here:
>
> Advantage of containers, to me, is that it leaves the host system pristine
> and clean, allowing standardized devops deployment of hardware for any
> purpose. Way back before - when using bare metal / ansible, reusing hw
> always involved full reformat of base system. This alone is worth the ~1-2%
> performance tax cgroup containers have.
>
> Advantage of kubernetes is more on the deployment side of things. Unified
> deployment scripts that can be written by devs. Same deployment yaml (or
> helm chart) can be used on local Dev Env / QA / Integration Env and finally
> Prod (with some tweaks).
>
> Depending on the networking CNI, and storage backend - Kubernetes can have
> a very close to bare metal performance. In the end it is always a
> trade-off. You gain some, you pay with extra overhead.
>
> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
> legacy MapReduce jobs too though) . Finding it much more manageable to
> allocate larger memory/cpu chunks to yarn pods and then have run
> auto-scaler to scale out YARN if needed; than to manage individual
> memory/cpu requirements on Spark on Kubernetes deployment.
>
> As far as I tested, Spark on Kubernetes is immature when reliability is
> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
> yet). When a node dies / goes down, I find executors not getting
> rescheduled to other nodes - the driver just gets stuck for the executors
> to come back. This does not happen on YARN / Standalone deployment (even
> when ran on same k8s cluster)
>
> Sincerely,
>
> Leszek Reimus
>
>
>
>
> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> dont containers finally run on systems, and the only advantage of
>> containers is that you can do better utilisation of system resources by
>> micro management of jobs running in it? Some say that containers have their
>> own binaries which isolates environment, but that is a lie, because in a
>> kubernetes environments that is running your SPARK jobs you will have the
>> same environment for all your kubes.
>>
>> And as you can see there are several other configurations, disk mounting,
>> security, etc issues to handle as an overhead as well.
>>
>> And the entire goal of all those added configurations is that someone in
>> your devops team feels using containers makes things more interesting
>> without any real added advantage to large volume jobs.
>>
>> But I may be wrong, and perhaps we need data, and not personal attacks
>> like the other person in the thread did.
>>
>> In case anyone does not know EMR does run on containers as well, and in
>> EMR running on EC2 nodes you can put all your binaries in containers and
>> use those for running your jobs.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
>> wrote:
>>
>>> Igor,
>>>
>>> what exact instance types do you use? Unless you use local instance
>>> storage and have actually configured your Kubernetes and Spark to use
>>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>>> investigate that by going to an instance, then to volume, and see
>>> monitoring charts.
>>>
>>> Another thought is that you're essentially giving 4GB per core. That
>>> sounds pretty low, in my experience.
>>>
>>>
>>>
>>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
>>> wrote:
>>>
 Hi Everyone,

 I'm running spark 3.2 on kubernetes and have a job with a decently
 sized shuffle of almost 4TB. The relevant cluster config is as follows:

 - 30 Executors. 16 physical cores, configured with 32 Cores for spark
 - 128 GB RAM
 -  shuffle.partitions is 18k which gives me tasks of around 150~180MB

 The job runs fine but I'm bothered by how underutilized the cluster
 gets during the reduce phase. During the map(reading data from s3 and
 writing the shuffle data) CPU usage, disk throughput and network usage is
 as expected, but during the reduce phase it gets really low. It seems the
 main bottleneck is reading shuffle data from other nodes, task statistics
 reports values ranging from 25s to several minutes(the task sizes are
 really close, they aren't skewed). I've tried increasing
 "spark.reducer.maxSizeInFlight" and
 "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
 a little, but not enough to saturate the cluster resources.

 Did I miss some more tuning parameters that could help?
 One obvious thing would be to vertically increase the machines and use
 less nodes to minimize traffic, but 30 nodes doesn't 

Re: Help with Shuffle Read performance

2022-09-29 Thread Gourav Sengupta
Hi Leszek,

spot on,  therefore EMR being created and dynamically scaled up and down
and being ephemeral proves that there is actually no advantage of using
containers for large jobs.

It is utterly pointless and I have attended interviews and workshops where
no one has ever been able to prove its value inplace of EMR's.

That said, if you have a large system, and want to use two different
containers in it for two different jobs with different binaries in them,
then container is the way to go. Also it may be useful jobs which requires
 low memory and disk space and requires low latency response time like real
time processing - but in that case why use SPARK and not something like
Kafka SQL, etc.

So this entire containers nonsense is utterly pointless, and useless from
everyway other than devops engineers making pointless arguments to create
and spend huge budget and money as very few organisations need low latency
variable throughput multiple binary hosting systems


Regards,
Gourav

On Fri, Sep 30, 2022 at 5:32 AM Leszek Reimus 
wrote:

> Hi Everyone,
>
> To add my 2 cents here:
>
> Advantage of containers, to me, is that it leaves the host system pristine
> and clean, allowing standardized devops deployment of hardware for any
> purpose. Way back before - when using bare metal / ansible, reusing hw
> always involved full reformat of base system. This alone is worth the ~1-2%
> performance tax cgroup containers have.
>
> Advantage of kubernetes is more on the deployment side of things. Unified
> deployment scripts that can be written by devs. Same deployment yaml (or
> helm chart) can be used on local Dev Env / QA / Integration Env and finally
> Prod (with some tweaks).
>
> Depending on the networking CNI, and storage backend - Kubernetes can have
> a very close to bare metal performance. In the end it is always a
> trade-off. You gain some, you pay with extra overhead.
>
> I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
> legacy MapReduce jobs too though) . Finding it much more manageable to
> allocate larger memory/cpu chunks to yarn pods and then have run
> auto-scaler to scale out YARN if needed; than to manage individual
> memory/cpu requirements on Spark on Kubernetes deployment.
>
> As far as I tested, Spark on Kubernetes is immature when reliability is
> concerned (or maybe our homegrown k8s does not do fencing/STONITH well
> yet). When a node dies / goes down, I find executors not getting
> rescheduled to other nodes - the driver just gets stuck for the executors
> to come back. This does not happen on YARN / Standalone deployment (even
> when ran on same k8s cluster)
>
> Sincerely,
>
> Leszek Reimus
>
>
>
>
> On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta 
> wrote:
>
>> Hi,
>>
>> dont containers finally run on systems, and the only advantage of
>> containers is that you can do better utilisation of system resources by
>> micro management of jobs running in it? Some say that containers have their
>> own binaries which isolates environment, but that is a lie, because in a
>> kubernetes environments that is running your SPARK jobs you will have the
>> same environment for all your kubes.
>>
>> And as you can see there are several other configurations, disk mounting,
>> security, etc issues to handle as an overhead as well.
>>
>> And the entire goal of all those added configurations is that someone in
>> your devops team feels using containers makes things more interesting
>> without any real added advantage to large volume jobs.
>>
>> But I may be wrong, and perhaps we need data, and not personal attacks
>> like the other person in the thread did.
>>
>> In case anyone does not know EMR does run on containers as well, and in
>> EMR running on EC2 nodes you can put all your binaries in containers and
>> use those for running your jobs.
>>
>> Regards,
>> Gourav Sengupta
>>
>> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
>> wrote:
>>
>>> Igor,
>>>
>>> what exact instance types do you use? Unless you use local instance
>>> storage and have actually configured your Kubernetes and Spark to use
>>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>>> investigate that by going to an instance, then to volume, and see
>>> monitoring charts.
>>>
>>> Another thought is that you're essentially giving 4GB per core. That
>>> sounds pretty low, in my experience.
>>>
>>>
>>>
>>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
>>> wrote:
>>>
 Hi Everyone,

 I'm running spark 3.2 on kubernetes and have a job with a decently
 sized shuffle of almost 4TB. The relevant cluster config is as follows:

 - 30 Executors. 16 physical cores, configured with 32 Cores for spark
 - 128 GB RAM
 -  shuffle.partitions is 18k which gives me tasks of around 150~180MB

 The job runs fine but I'm bothered by how underutilized the cluster
 gets during the reduce phase. During the map(reading data from s3 and
 writing the shuffle data) 

Re: Help with Shuffle Read performance

2022-09-29 Thread Leszek Reimus
Hi Everyone,

To add my 2 cents here:

Advantage of containers, to me, is that it leaves the host system pristine
and clean, allowing standardized devops deployment of hardware for any
purpose. Way back before - when using bare metal / ansible, reusing hw
always involved full reformat of base system. This alone is worth the ~1-2%
performance tax cgroup containers have.

Advantage of kubernetes is more on the deployment side of things. Unified
deployment scripts that can be written by devs. Same deployment yaml (or
helm chart) can be used on local Dev Env / QA / Integration Env and finally
Prod (with some tweaks).

Depending on the networking CNI, and storage backend - Kubernetes can have
a very close to bare metal performance. In the end it is always a
trade-off. You gain some, you pay with extra overhead.

I'm running YARN on kubernetes and mostly run Spark on top of YARN (some
legacy MapReduce jobs too though) . Finding it much more manageable to
allocate larger memory/cpu chunks to yarn pods and then have run
auto-scaler to scale out YARN if needed; than to manage individual
memory/cpu requirements on Spark on Kubernetes deployment.

As far as I tested, Spark on Kubernetes is immature when reliability is
concerned (or maybe our homegrown k8s does not do fencing/STONITH well
yet). When a node dies / goes down, I find executors not getting
rescheduled to other nodes - the driver just gets stuck for the executors
to come back. This does not happen on YARN / Standalone deployment (even
when ran on same k8s cluster)

Sincerely,

Leszek Reimus




On Thu, Sep 29, 2022 at 7:06 PM Gourav Sengupta 
wrote:

> Hi,
>
> dont containers finally run on systems, and the only advantage of
> containers is that you can do better utilisation of system resources by
> micro management of jobs running in it? Some say that containers have their
> own binaries which isolates environment, but that is a lie, because in a
> kubernetes environments that is running your SPARK jobs you will have the
> same environment for all your kubes.
>
> And as you can see there are several other configurations, disk mounting,
> security, etc issues to handle as an overhead as well.
>
> And the entire goal of all those added configurations is that someone in
> your devops team feels using containers makes things more interesting
> without any real added advantage to large volume jobs.
>
> But I may be wrong, and perhaps we need data, and not personal attacks
> like the other person in the thread did.
>
> In case anyone does not know EMR does run on containers as well, and in
> EMR running on EC2 nodes you can put all your binaries in containers and
> use those for running your jobs.
>
> Regards,
> Gourav Sengupta
>
> On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
> wrote:
>
>> Igor,
>>
>> what exact instance types do you use? Unless you use local instance
>> storage and have actually configured your Kubernetes and Spark to use
>> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
>> investigate that by going to an instance, then to volume, and see
>> monitoring charts.
>>
>> Another thought is that you're essentially giving 4GB per core. That
>> sounds pretty low, in my experience.
>>
>>
>>
>> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
>> wrote:
>>
>>> Hi Everyone,
>>>
>>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>>
>>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>>> - 128 GB RAM
>>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>>
>>> The job runs fine but I'm bothered by how underutilized the cluster gets
>>> during the reduce phase. During the map(reading data from s3 and writing
>>> the shuffle data) CPU usage, disk throughput and network usage is as
>>> expected, but during the reduce phase it gets really low. It seems the main
>>> bottleneck is reading shuffle data from other nodes, task statistics
>>> reports values ranging from 25s to several minutes(the task sizes are
>>> really close, they aren't skewed). I've tried increasing
>>> "spark.reducer.maxSizeInFlight" and
>>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>>> a little, but not enough to saturate the cluster resources.
>>>
>>> Did I miss some more tuning parameters that could help?
>>> One obvious thing would be to vertically increase the machines and use
>>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>>> considering 30x30 connections.
>>>
>>> Thanks in advance!
>>>
>>>
>>
>> --
>> Vladimir Prus
>> http://vladimirprus.com
>>
>

-- 
--
"It is the common fate of the indolent to see their rights become a prey to
the active. The condition upon which God hath given liberty to man is
eternal vigilance; which condition if he break, servitude is at once the
consequence of his crime and the punishment of his guilt." - John Philpot

Re: Help with Shuffle Read performance

2022-09-29 Thread Gourav Sengupta
Hi,

dont containers finally run on systems, and the only advantage of
containers is that you can do better utilisation of system resources by
micro management of jobs running in it? Some say that containers have their
own binaries which isolates environment, but that is a lie, because in a
kubernetes environments that is running your SPARK jobs you will have the
same environment for all your kubes.

And as you can see there are several other configurations, disk mounting,
security, etc issues to handle as an overhead as well.

And the entire goal of all those added configurations is that someone in
your devops team feels using containers makes things more interesting
without any real added advantage to large volume jobs.

But I may be wrong, and perhaps we need data, and not personal attacks like
the other person in the thread did.

In case anyone does not know EMR does run on containers as well, and in EMR
running on EC2 nodes you can put all your binaries in containers and use
those for running your jobs.

Regards,
Gourav Sengupta

On Thu, Sep 29, 2022 at 7:46 PM Vladimir Prus 
wrote:

> Igor,
>
> what exact instance types do you use? Unless you use local instance
> storage and have actually configured your Kubernetes and Spark to use
> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
> investigate that by going to an instance, then to volume, and see
> monitoring charts.
>
> Another thought is that you're essentially giving 4GB per core. That
> sounds pretty low, in my experience.
>
>
>
> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>
>
> --
> Vladimir Prus
> http://vladimirprus.com
>


Re: Help with Shuffle Read performance

2022-09-29 Thread Igor Calabria
> What's the total number of Partitions that you have ?
18k

> What machines are you using ? Are you using an SSD ?
Using a family of r5.4xlarges nodes. Yes I'm using five GP3 Disks which
gives me about 625 MB/s of sustained throughput (which is what I see when
writing the shuffle data).

> can you please provide whats the size of the shuffle file that is getting
generated in each task .
I have to check that. But total sizes of tasks are around 150 ~ 180 MB

> what exact instance types do you use? Unless you use local instance
storage and have actually configured your Kubernetes and Spark to use
instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
investigate that by going to an instance, then to volume, and see
monitoring charts.
That's a pretty good hint. I forgot to check the IOPs limit, was only
looking at the throughput.

> Another thought is that you're essentially giving 4GB per core. That
sounds pretty low, in my experience.
For this particular job, it seems fine. No long GCs and peak usage per task
was reported at 1.4, so plenty of room.

Thanks a lot for the responses. I'm betting this is related to EBS IOPS
limits, since most of our jobs use instances with local disks instead.

On Thu, Sep 29, 2022 at 7:44 PM Vladimir Prus 
wrote:

> Igor,
>
> what exact instance types do you use? Unless you use local instance
> storage and have actually configured your Kubernetes and Spark to use
> instance storage, your 30x30 exchange can run into EBS IOPS limits. You can
> investigate that by going to an instance, then to volume, and see
> monitoring charts.
>
> Another thought is that you're essentially giving 4GB per core. That
> sounds pretty low, in my experience.
>
>
>
> On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>
>
> --
> Vladimir Prus
> http://vladimirprus.com
>


Re: Help with Shuffle Read performance

2022-09-29 Thread Vladimir Prus
Igor,

what exact instance types do you use? Unless you use local instance storage
and have actually configured your Kubernetes and Spark to use instance
storage, your 30x30 exchange can run into EBS IOPS limits. You can
investigate that by going to an instance, then to volume, and see
monitoring charts.

Another thought is that you're essentially giving 4GB per core. That sounds
pretty low, in my experience.



On Thu, Sep 29, 2022 at 9:13 PM Igor Calabria 
wrote:

> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>

-- 
Vladimir Prus
http://vladimirprus.com


Re: Help with Shuffle Read performance

2022-09-29 Thread Tufan Rakshit
that's Total Nonsense , EMR is total  crap , use kubernetes i will help
you .
can you please provide whats the size of the shuffle file that is getting
generated in each task .
What's the total number of Partitions that you have ?
What machines are you using ? Are you using an SSD ?

Best
Tufan

On Thu, 29 Sept 2022 at 20:28, Gourav Sengupta 
wrote:

> Hi,
>
> why not use EMR or data proc, kubernetes does not provide any benefit at
> all for such scale of work. It is a classical case of over engineering and
> over complication just for the heck of it.
>
> Also I think that in case you are in AWS, Redshift Spectrum or Athena for
> 90% of use cases are way optimal.
>
> Regards,
> Gourav
>
> On Thu, Sep 29, 2022 at 7:13 PM Igor Calabria 
> wrote:
>
>> Hi Everyone,
>>
>> I'm running spark 3.2 on kubernetes and have a job with a decently sized
>> shuffle of almost 4TB. The relevant cluster config is as follows:
>>
>> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
>> - 128 GB RAM
>> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>>
>> The job runs fine but I'm bothered by how underutilized the cluster gets
>> during the reduce phase. During the map(reading data from s3 and writing
>> the shuffle data) CPU usage, disk throughput and network usage is as
>> expected, but during the reduce phase it gets really low. It seems the main
>> bottleneck is reading shuffle data from other nodes, task statistics
>> reports values ranging from 25s to several minutes(the task sizes are
>> really close, they aren't skewed). I've tried increasing
>> "spark.reducer.maxSizeInFlight" and
>> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
>> a little, but not enough to saturate the cluster resources.
>>
>> Did I miss some more tuning parameters that could help?
>> One obvious thing would be to vertically increase the machines and use
>> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
>> considering 30x30 connections.
>>
>> Thanks in advance!
>>
>>


Re: Help with Shuffle Read performance

2022-09-29 Thread Gourav Sengupta
Hi,

why not use EMR or data proc, kubernetes does not provide any benefit at
all for such scale of work. It is a classical case of over engineering and
over complication just for the heck of it.

Also I think that in case you are in AWS, Redshift Spectrum or Athena for
90% of use cases are way optimal.

Regards,
Gourav

On Thu, Sep 29, 2022 at 7:13 PM Igor Calabria 
wrote:

> Hi Everyone,
>
> I'm running spark 3.2 on kubernetes and have a job with a decently sized
> shuffle of almost 4TB. The relevant cluster config is as follows:
>
> - 30 Executors. 16 physical cores, configured with 32 Cores for spark
> - 128 GB RAM
> -  shuffle.partitions is 18k which gives me tasks of around 150~180MB
>
> The job runs fine but I'm bothered by how underutilized the cluster gets
> during the reduce phase. During the map(reading data from s3 and writing
> the shuffle data) CPU usage, disk throughput and network usage is as
> expected, but during the reduce phase it gets really low. It seems the main
> bottleneck is reading shuffle data from other nodes, task statistics
> reports values ranging from 25s to several minutes(the task sizes are
> really close, they aren't skewed). I've tried increasing
> "spark.reducer.maxSizeInFlight" and
> "spark.shuffle.io.numConnectionsPerPeer" and it did improve performance by
> a little, but not enough to saturate the cluster resources.
>
> Did I miss some more tuning parameters that could help?
> One obvious thing would be to vertically increase the machines and use
> less nodes to minimize traffic, but 30 nodes doesn't seem like much even
> considering 30x30 connections.
>
> Thanks in advance!
>
>


Re: Help With unstructured text file with spark scala

2022-02-25 Thread Danilo Sousa
Rafael Mendes,

Are you from ?

Thanks.
> On 21 Feb 2022, at 15:33, Danilo Sousa  wrote:
> 
> Yes, this a only single file.
> 
> Thanks Rafael Mendes.
> 
>> On 13 Feb 2022, at 07:13, Rafael Mendes > > wrote:
>> 
>> Hi, Danilo.
>> Do you have a single large file, only?
>> If so, I guess you can use tools like sed/awk to split it into more files 
>> based on layout, so you can read these files into Spark.
>> 
>> 
>> Em qua, 9 de fev de 2022 09:30, Bitfox > > escreveu:
>> Hi
>> 
>> I am not sure about the total situation.
>> But if you want a scala integration I think it could use regex to match and 
>> capture the keywords.
>> Here I wrote one you can modify by your end.
>> 
>> import scala.io.Source
>> import scala.collection.mutable.ArrayBuffer
>> 
>> val list1 = ArrayBuffer[(String,String,String)]()
>> val list2 = ArrayBuffer[(String,String)]()
>> 
>> 
>> val patt1 = """^(.*)#(.*)#([^#]*)$""".r
>> val patt2 = """^(.*)#([^#]*)$""".r
>> 
>> val file = "1.txt"
>> val lines = Source.fromFile(file).getLines()
>> 
>> for ( x <- lines ) {
>>   x match {
>> case patt1(k,v,z) => list1 += ((k,v,z))
>> case patt2(k,v) => list2 += ((k,v))
>> case _ => println("no match")
>>   }
>> }
>> 
>> 
>> Now the list1 and list2 have the elements you wanted, you can convert them 
>> to a dataframe easily.
>> 
>> Thanks.
>> 
>> On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa > > wrote:
>> Hello
>> 
>> 
>> Yes, for this block I can open as csv with # delimiter, but have the block 
>> that is no csv format. 
>> 
>> This is the likely key value. 
>> 
>> We have two different layouts in the same file. This is the “problem”.
>> 
>> Thanks for your time.
>> 
>> 
>> 
>>> Relação de Beneficiários Ativos e Excluídos
>>> Carteira em#27/12/2019##Todos os Beneficiários
>>> Operadora#AMIL
>>> Filial#SÃO PAULO#Unidade#Guarulhos
>>> 
>>> Contrato#123456 - Test
>>> Empresa#Test
>> 
>>> On 9 Feb 2022, at 00:58, Bitfox >> > wrote:
>>> 
>>> Hello
>>> 
>>> You can treat it as a csf file and load it from spark:
>>> 
>>> >>> df = spark.read.format("csv").option("inferSchema", 
>>> >>> "true").option("header", "true").option("sep","#").load(csv_file)
>>> >>> df.show()
>>> ++---+-+
>>> |   Plano|Código Beneficiário|Nome Beneficiário|
>>> ++---+-+
>>> |58693 - NACIONAL ...|   65751353|   Jose Silva|
>>> |58693 - NACIONAL ...|   65751388|  Joana Silva|
>>> |58693 - NACIONAL ...|   65751353| Felipe Silva|
>>> |58693 - NACIONAL ...|   65751388|  Julia Silva|
>>> ++---+-+
>>> 
>>> 
>>> cat csv_file:
>>> 
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>>> 
>>> 
>>> Regards
>>> 
>>> 
>>> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa >> > wrote:
>>> Hi
>>> I have to transform unstructured text to dataframe.
>>> Could anyone please help with Scala code ?
>>> 
>>> Dataframe need as:
>>> 
>>> operadora filial unidade contrato empresa plano codigo_beneficiario 
>>> nome_beneficiario
>>> 
>>> Relação de Beneficiários Ativos e Excluídos
>>> Carteira em#27/12/2019##Todos os Beneficiários
>>> Operadora#AMIL
>>> Filial#SÃO PAULO#Unidade#Guarulhos
>>> 
>>> Contrato#123456 - Test
>>> Empresa#Test
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>>> 
>>> Contrato#898011000 - FUNDACAO GERDAU
>>> Empresa#FUNDACAO GERDAU
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>>> 
>>> 
>> 
> 



Re: Help With unstructured text file with spark scala

2022-02-21 Thread Danilo Sousa
Yes, this a only single file.

Thanks Rafael Mendes.

> On 13 Feb 2022, at 07:13, Rafael Mendes  wrote:
> 
> Hi, Danilo.
> Do you have a single large file, only?
> If so, I guess you can use tools like sed/awk to split it into more files 
> based on layout, so you can read these files into Spark.
> 
> 
> Em qua, 9 de fev de 2022 09:30, Bitfox  escreveu:
> Hi
> 
> I am not sure about the total situation.
> But if you want a scala integration I think it could use regex to match and 
> capture the keywords.
> Here I wrote one you can modify by your end.
> 
> import scala.io.Source
> import scala.collection.mutable.ArrayBuffer
> 
> val list1 = ArrayBuffer[(String,String,String)]()
> val list2 = ArrayBuffer[(String,String)]()
> 
> 
> val patt1 = """^(.*)#(.*)#([^#]*)$""".r
> val patt2 = """^(.*)#([^#]*)$""".r
> 
> val file = "1.txt"
> val lines = Source.fromFile(file).getLines()
> 
> for ( x <- lines ) {
>   x match {
> case patt1(k,v,z) => list1 += ((k,v,z))
> case patt2(k,v) => list2 += ((k,v))
> case _ => println("no match")
>   }
> }
> 
> 
> Now the list1 and list2 have the elements you wanted, you can convert them to 
> a dataframe easily.
> 
> Thanks.
> 
> On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa  > wrote:
> Hello
> 
> 
> Yes, for this block I can open as csv with # delimiter, but have the block 
> that is no csv format. 
> 
> This is the likely key value. 
> 
> We have two different layouts in the same file. This is the “problem”.
> 
> Thanks for your time.
> 
> 
> 
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>> 
>> Contrato#123456 - Test
>> Empresa#Test
> 
>> On 9 Feb 2022, at 00:58, Bitfox > > wrote:
>> 
>> Hello
>> 
>> You can treat it as a csf file and load it from spark:
>> 
>> >>> df = spark.read.format("csv").option("inferSchema", 
>> >>> "true").option("header", "true").option("sep","#").load(csv_file)
>> >>> df.show()
>> ++---+-+
>> |   Plano|Código Beneficiário|Nome Beneficiário|
>> ++---+-+
>> |58693 - NACIONAL ...|   65751353|   Jose Silva|
>> |58693 - NACIONAL ...|   65751388|  Joana Silva|
>> |58693 - NACIONAL ...|   65751353| Felipe Silva|
>> |58693 - NACIONAL ...|   65751388|  Julia Silva|
>> ++---+-+
>> 
>> 
>> cat csv_file:
>> 
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> 
>> 
>> Regards
>> 
>> 
>> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa > > wrote:
>> Hi
>> I have to transform unstructured text to dataframe.
>> Could anyone please help with Scala code ?
>> 
>> Dataframe need as:
>> 
>> operadora filial unidade contrato empresa plano codigo_beneficiario 
>> nome_beneficiario
>> 
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>> 
>> Contrato#123456 - Test
>> Empresa#Test
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>> 
>> Contrato#898011000 - FUNDACAO GERDAU
>> Empresa#FUNDACAO GERDAU
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
>> 
>> 
> 



Re: Help With unstructured text file with spark scala

2022-02-13 Thread Rafael Mendes
Hi, Danilo.
Do you have a single large file, only?
If so, I guess you can use tools like sed/awk to split it into more files
based on layout, so you can read these files into Spark.


Em qua, 9 de fev de 2022 09:30, Bitfox  escreveu:

> Hi
>
> I am not sure about the total situation.
> But if you want a scala integration I think it could use regex to match
> and capture the keywords.
> Here I wrote one you can modify by your end.
>
> import scala.io.Source
>
> import scala.collection.mutable.ArrayBuffer
>
>
> val list1 = ArrayBuffer[(String,String,String)]()
>
> val list2 = ArrayBuffer[(String,String)]()
>
>
>
> val patt1 = """^(.*)#(.*)#([^#]*)$""".r
>
> val patt2 = """^(.*)#([^#]*)$""".r
>
>
> val file = "1.txt"
>
> val lines = Source.fromFile(file).getLines()
>
>
> for ( x <- lines ) {
>
>   x match {
>
> case patt1(k,v,z) => list1 += ((k,v,z))
>
> case patt2(k,v) => list2 += ((k,v))
>
> case _ => println("no match")
>
>   }
>
> }
>
>
>
> Now the list1 and list2 have the elements you wanted, you can convert them
> to a dataframe easily.
>
>
> Thanks.
>
> On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa 
> wrote:
>
>> Hello
>>
>>
>> Yes, for this block I can open as csv with # delimiter, but have the
>> block that is no csv format.
>>
>> This is the likely key value.
>>
>> We have two different layouts in the same file. This is the “problem”.
>>
>> Thanks for your time.
>>
>>
>>
>> Relação de Beneficiários Ativos e Excluídos
>>> Carteira em#27/12/2019##Todos os Beneficiários
>>> Operadora#AMIL
>>> Filial#SÃO PAULO#Unidade#Guarulhos
>>>
>>> Contrato#123456 - Test
>>> Empresa#Test
>>
>>
>> On 9 Feb 2022, at 00:58, Bitfox  wrote:
>>
>> Hello
>>
>> You can treat it as a csf file and load it from spark:
>>
>> >>> df = spark.read.format("csv").option("inferSchema",
>> "true").option("header", "true").option("sep","#").load(csv_file)
>> >>> df.show()
>> ++---+-+
>> |   Plano|Código Beneficiário|Nome Beneficiário|
>> ++---+-+
>> |58693 - NACIONAL ...|   65751353|   Jose Silva|
>> |58693 - NACIONAL ...|   65751388|  Joana Silva|
>> |58693 - NACIONAL ...|   65751353| Felipe Silva|
>> |58693 - NACIONAL ...|   65751388|  Julia Silva|
>> ++---+-+
>>
>>
>> cat csv_file:
>>
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>>
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>>
>>
>> Regards
>>
>>
>> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa 
>> wrote:
>>
>>> Hi
>>> I have to transform unstructured text to dataframe.
>>> Could anyone please help with Scala code ?
>>>
>>> Dataframe need as:
>>>
>>> operadora filial unidade contrato empresa plano codigo_beneficiario
>>> nome_beneficiario
>>>
>>> Relação de Beneficiários Ativos e Excluídos
>>> Carteira em#27/12/2019##Todos os Beneficiários
>>> Operadora#AMIL
>>> Filial#SÃO PAULO#Unidade#Guarulhos
>>>
>>> Contrato#123456 - Test
>>> Empresa#Test
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>>>
>>> Contrato#898011000 - FUNDACAO GERDAU
>>> Empresa#FUNDACAO GERDAU
>>> Plano#Código Beneficiário#Nome Beneficiário
>>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>
>>


Re: Help With unstructured text file with spark scala

2022-02-09 Thread Bitfox
Hi

I am not sure about the total situation.
But if you want a scala integration I think it could use regex to match and
capture the keywords.
Here I wrote one you can modify by your end.

import scala.io.Source

import scala.collection.mutable.ArrayBuffer


val list1 = ArrayBuffer[(String,String,String)]()

val list2 = ArrayBuffer[(String,String)]()



val patt1 = """^(.*)#(.*)#([^#]*)$""".r

val patt2 = """^(.*)#([^#]*)$""".r


val file = "1.txt"

val lines = Source.fromFile(file).getLines()


for ( x <- lines ) {

  x match {

case patt1(k,v,z) => list1 += ((k,v,z))

case patt2(k,v) => list2 += ((k,v))

case _ => println("no match")

  }

}



Now the list1 and list2 have the elements you wanted, you can convert them
to a dataframe easily.


Thanks.

On Wed, Feb 9, 2022 at 7:20 PM Danilo Sousa 
wrote:

> Hello
>
>
> Yes, for this block I can open as csv with # delimiter, but have the block
> that is no csv format.
>
> This is the likely key value.
>
> We have two different layouts in the same file. This is the “problem”.
>
> Thanks for your time.
>
>
>
> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>>
>> Contrato#123456 - Test
>> Empresa#Test
>
>
> On 9 Feb 2022, at 00:58, Bitfox  wrote:
>
> Hello
>
> You can treat it as a csf file and load it from spark:
>
> >>> df = spark.read.format("csv").option("inferSchema",
> "true").option("header", "true").option("sep","#").load(csv_file)
> >>> df.show()
> ++---+-+
> |   Plano|Código Beneficiário|Nome Beneficiário|
> ++---+-+
> |58693 - NACIONAL ...|   65751353|   Jose Silva|
> |58693 - NACIONAL ...|   65751388|  Joana Silva|
> |58693 - NACIONAL ...|   65751353| Felipe Silva|
> |58693 - NACIONAL ...|   65751388|  Julia Silva|
> ++---+-+
>
>
> cat csv_file:
>
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>
>
> Regards
>
>
> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa 
> wrote:
>
>> Hi
>> I have to transform unstructured text to dataframe.
>> Could anyone please help with Scala code ?
>>
>> Dataframe need as:
>>
>> operadora filial unidade contrato empresa plano codigo_beneficiario
>> nome_beneficiario
>>
>> Relação de Beneficiários Ativos e Excluídos
>> Carteira em#27/12/2019##Todos os Beneficiários
>> Operadora#AMIL
>> Filial#SÃO PAULO#Unidade#Guarulhos
>>
>> Contrato#123456 - Test
>> Empresa#Test
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>>
>> Contrato#898011000 - FUNDACAO GERDAU
>> Empresa#FUNDACAO GERDAU
>> Plano#Código Beneficiário#Nome Beneficiário
>> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>
>>
>


Re: Help With unstructured text file with spark scala

2022-02-09 Thread Danilo Sousa
Hello, how are you?

Thanks for your time

> Does the data contain records? 
Yes
> Are the records "homogenous" ; ie; do they have the same fields?
Yes the data is homogenous but have “two layouts” in the same file.
> What is the format of the data?
All data is string file .txt
> Are records separated by lines/seperators?
Yes, the delimiter is “#” but as said, we have two layouts in the same file
This likely key value
>Carteira em#27/12/2019##Todos os Beneficiários
>Operadora#AMIL
>Filial#SÃO PAULO#Unidade#Guarulhos
> 
>Contrato#123456 - Test
>Empresa#Test

And this like csv format

>Plano#Código Beneficiário#Nome Beneficiário
>58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>58693 - NACIONAL R COPART PJCE#073930313#Maria Silva

> Is the data sharded across multiple files?
No
> How big is each shard?
Approximately 20gb

> On 8 Feb 2022, at 16:56, Lalwani, Jayesh  wrote:
> 
> You will need to provide more info.
> 
> Does the data contain records? 
> Are the records "homogenous" ; ie; do they have the same fields?
> What is the format of the data?
> Are records separated by lines/seperators?
> Is the data sharded across multiple files?
> How big is each shard?
> 
> 
> 
> On 2/8/22, 11:50 AM, "Danilo Sousa"  wrote:
> 
>CAUTION: This email originated from outside of the organization. Do not 
> click links or open attachments unless you can confirm the sender and know 
> the content is safe.
> 
> 
> 
>Hi
>I have to transform unstructured text to dataframe.
>Could anyone please help with Scala code ?
> 
>Dataframe need as:
> 
>operadora filial unidade contrato empresa plano codigo_beneficiario 
> nome_beneficiario
> 
>Relação de Beneficiários Ativos e Excluídos
>Carteira em#27/12/2019##Todos os Beneficiários
>Operadora#AMIL
>Filial#SÃO PAULO#Unidade#Guarulhos
> 
>Contrato#123456 - Test
>Empresa#Test
>Plano#Código Beneficiário#Nome Beneficiário
>58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
>58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
> 
>Contrato#898011000 - FUNDACAO GERDAU
>Empresa#FUNDACAO GERDAU
>Plano#Código Beneficiário#Nome Beneficiário
>58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
>58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
>58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
>58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
>-
>To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> 
> 


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Help With unstructured text file with spark scala

2022-02-09 Thread Danilo Sousa
Hello


Yes, for this block I can open as csv with # delimiter, but have the block that 
is no csv format. 

This is the likely key value. 

We have two different layouts in the same file. This is the “problem”.

Thanks for your time.



> Relação de Beneficiários Ativos e Excluídos
> Carteira em#27/12/2019##Todos os Beneficiários
> Operadora#AMIL
> Filial#SÃO PAULO#Unidade#Guarulhos
> 
> Contrato#123456 - Test
> Empresa#Test

> On 9 Feb 2022, at 00:58, Bitfox  wrote:
> 
> Hello
> 
> You can treat it as a csf file and load it from spark:
> 
> >>> df = spark.read.format("csv").option("inferSchema", 
> >>> "true").option("header", "true").option("sep","#").load(csv_file)
> >>> df.show()
> ++---+-+
> |   Plano|Código Beneficiário|Nome Beneficiário|
> ++---+-+
> |58693 - NACIONAL ...|   65751353|   Jose Silva|
> |58693 - NACIONAL ...|   65751388|  Joana Silva|
> |58693 - NACIONAL ...|   65751353| Felipe Silva|
> |58693 - NACIONAL ...|   65751388|  Julia Silva|
> ++---+-+
> 
> 
> cat csv_file:
> 
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
> 
> 
> Regards
> 
> 
> On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa  > wrote:
> Hi
> I have to transform unstructured text to dataframe.
> Could anyone please help with Scala code ?
> 
> Dataframe need as:
> 
> operadora filial unidade contrato empresa plano codigo_beneficiario 
> nome_beneficiario
> 
> Relação de Beneficiários Ativos e Excluídos
> Carteira em#27/12/2019##Todos os Beneficiários
> Operadora#AMIL
> Filial#SÃO PAULO#Unidade#Guarulhos
> 
> Contrato#123456 - Test
> Empresa#Test
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
> 
> Contrato#898011000 - FUNDACAO GERDAU
> Empresa#FUNDACAO GERDAU
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org 
> 
> 



Re: Help With unstructured text file with spark scala

2022-02-08 Thread Bitfox
Hello

You can treat it as a csf file and load it from spark:

>>> df = spark.read.format("csv").option("inferSchema",
"true").option("header", "true").option("sep","#").load(csv_file)

>>> df.show()

++---+-+

|   Plano|Código Beneficiário|Nome Beneficiário|

++---+-+

|58693 - NACIONAL ...|   65751353|   Jose Silva|

|58693 - NACIONAL ...|   65751388|  Joana Silva|

|58693 - NACIONAL ...|   65751353| Felipe Silva|

|58693 - NACIONAL ...|   65751388|  Julia Silva|

++---+-+



cat csv_file:


Plano#Código Beneficiário#Nome Beneficiário

58693 - NACIONAL R COPART PJCE#065751353#Jose Silva

58693 - NACIONAL R COPART PJCE#065751388#Joana Silva

58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva

58693 - NACIONAL R COPART PJCE#065751388#Julia Silva



Regards



On Wed, Feb 9, 2022 at 12:50 AM Danilo Sousa 
wrote:

> Hi
> I have to transform unstructured text to dataframe.
> Could anyone please help with Scala code ?
>
> Dataframe need as:
>
> operadora filial unidade contrato empresa plano codigo_beneficiario
> nome_beneficiario
>
> Relação de Beneficiários Ativos e Excluídos
> Carteira em#27/12/2019##Todos os Beneficiários
> Operadora#AMIL
> Filial#SÃO PAULO#Unidade#Guarulhos
>
> Contrato#123456 - Test
> Empresa#Test
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
> 58693 - NACIONAL R COPART PJCE#073930313#Maria Silva
>
> Contrato#898011000 - FUNDACAO GERDAU
> Empresa#FUNDACAO GERDAU
> Plano#Código Beneficiário#Nome Beneficiário
> 58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
> 58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
> 58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Help With unstructured text file with spark scala

2022-02-08 Thread Lalwani, Jayesh
You will need to provide more info.

Does the data contain records? 
Are the records "homogenous" ; ie; do they have the same fields?
What is the format of the data?
Are records separated by lines/seperators?
Is the data sharded across multiple files?
How big is each shard?



On 2/8/22, 11:50 AM, "Danilo Sousa"  wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Hi
I have to transform unstructured text to dataframe.
Could anyone please help with Scala code ?

Dataframe need as:

operadora filial unidade contrato empresa plano codigo_beneficiario 
nome_beneficiario

Relação de Beneficiários Ativos e Excluídos
Carteira em#27/12/2019##Todos os Beneficiários
Operadora#AMIL
Filial#SÃO PAULO#Unidade#Guarulhos

Contrato#123456 - Test
Empresa#Test
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#073930312#Joao Silva
58693 - NACIONAL R COPART PJCE#073930313#Maria Silva

Contrato#898011000 - FUNDACAO GERDAU
Empresa#FUNDACAO GERDAU
Plano#Código Beneficiário#Nome Beneficiário
58693 - NACIONAL R COPART PJCE#065751353#Jose Silva
58693 - NACIONAL R COPART PJCE#065751388#Joana Silva
58693 - NACIONAL R COPART PJCE#065751353#Felipe Silva
58693 - NACIONAL R COPART PJCE#065751388#Julia Silva
-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org




Re: help check my simple job

2022-02-06 Thread capitnfrakass

That did resolve my issue.
Thanks a lot.

frakass


n 06/02/2022 17:25, Hannes Bibel wrote:

Hi,

looks like you're packaging your application for Scala 2.13 (should be
specified in your build.sbt) while your Spark installation is built
for Scala 2.12.

Go to https://spark.apache.org/downloads.html, select under "Choose a
package type" the package type that says "Scala 2.13". With that
release you should be able to run your application.

In general, minor versions of Scala (e.g. 2.12 and 2.13) are
incompatible.

Best
Hannes

On Sun, Feb 6, 2022 at 10:01 AM  wrote:


Hello

I wrote this simple job in scala:

$ cat Myjob.scala
import org.apache.spark.sql.SparkSession

object Myjob {
def main(args: Array[String]): Unit = {
val sparkSession = SparkSession.builder.appName("Simple
Application").getOrCreate()
val sparkContext = sparkSession.sparkContext

val arrayRDD = sparkContext.parallelize(List(1,2,3,4,5,6,7,8))
println(arrayRDD.getClass, arrayRDD.count())
}
}

After package it then I submit it to spark, it gets the error:

$ /opt/spark/bin/spark-submit --class "Myjob" --master local[4]
target/scala-2.13/my-job_2.13-1.0.jar

Exception in thread "main" java.lang.NoSuchMethodError:
'scala.collection.immutable.ArraySeq
scala.runtime.ScalaRunTime$.wrapIntArray(int[])'
at Myjob$.main(Myjob.scala:8)
at Myjob.main(Myjob.scala)
at


java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native


Method)
at


java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at


java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at
java.base/java.lang.reflect.Method.invoke(Method.java:566)
at


org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)

at
org.apache.spark.deploy.SparkSubmit.org
[1]$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
at


org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)

at
org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at
org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at


org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)

at
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
at
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

What's the issue?

Thank you.



-

To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Links:
--
[1] http://org.apache.spark.deploy.SparkSubmit.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: help check my simple job

2022-02-06 Thread Hannes Bibel
Hi,

looks like you're packaging your application for Scala 2.13 (should be
specified in your build.sbt) while your Spark installation is built for
Scala 2.12.

Go to https://spark.apache.org/downloads.html, select under "Choose a
package type" the package type that says "Scala 2.13". With that release
you should be able to run your application.

In general, minor versions of Scala (e.g. 2.12 and 2.13) are incompatible.

Best
Hannes


On Sun, Feb 6, 2022 at 10:01 AM  wrote:

> Hello
>
>   I wrote this simple job in scala:
>
> $ cat Myjob.scala
> import org.apache.spark.sql.SparkSession
>
> object Myjob {
>def main(args: Array[String]): Unit = {
>  val sparkSession = SparkSession.builder.appName("Simple
> Application").getOrCreate()
>  val sparkContext = sparkSession.sparkContext
>
>  val arrayRDD = sparkContext.parallelize(List(1,2,3,4,5,6,7,8))
>  println(arrayRDD.getClass, arrayRDD.count())
>}
> }
>
>
> After package it then I submit it to spark, it gets the error:
>
> $ /opt/spark/bin/spark-submit --class "Myjob" --master local[4]
> target/scala-2.13/my-job_2.13-1.0.jar
>
> Exception in thread "main" java.lang.NoSuchMethodError:
> 'scala.collection.immutable.ArraySeq
> scala.runtime.ScalaRunTime$.wrapIntArray(int[])'
> at Myjob$.main(Myjob.scala:8)
> at Myjob.main(Myjob.scala)
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method)
> at
>
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.base/java.lang.reflect.Method.invoke(Method.java:566)
> at
>
> org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
> at
> org.apache.spark.deploy.SparkSubmit.org
> $apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
> at
> org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
> at
> org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
> at
> org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
> at
>
> org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
> at
> org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> What's the issue?
>
> Thank you.
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: help on use case - spark parquet processing

2020-08-13 Thread Amit Sharma
Can you keep option field in your case class.


Thanks
Amit

On Thu, Aug 13, 2020 at 12:47 PM manjay kumar 
wrote:

> Hi ,
>
> I have a use case,
>
> where i need to merge three data set and build one where ever data is
> available.
>
> And my dataset is a complex object.
>
> Customer
> - name - string
> - accounts - List
>
> Account
> - type - String
> - Adressess - List
>
> Address
> -name - String
>
> 
>
> ---
>
>
> And it goes on.
>
> These file are in parquet ,
>
>
> All 3 input datasets are having some details , which need to merge.
>
> And build one dataset , which has all the information ( i know the files
> which need to merge )
>
>
> I want to know , how should I proceed on this  ??
>
> - my approach is to build case class of actual output and parse the three
> dataset.
>  ( but this is failing because the input response have not all the fields).
>
> So basically , what should be the approach to deal this kind of problem ?
>
> 2nd , how can i convert parquet dataframe to dataset, considering the
> pauquet struct does not have all the fields. but case class has all the
> field ( i am getting error no struct type found)
>
> Thanks
> Manjay Kumar
> 8320 120 839
>
>
>


Re: help understanding physical plan

2019-08-16 Thread Marcelo Valle
Thanks Tianlang. I saw the DAG on YARN, but what really solved my problem
is adding intermediate steps and evaluating them eagerly to find out where
the bottleneck was.
My process now runs in 6 min. :D

Thanks for the help.

[]s

On Thu, 15 Aug 2019 at 07:25, Tianlang 
wrote:

> Hi,
>
> Maybe you can look at the spark ui. The physical plan has no time
> consuming information.
> 在 2019/8/13 下午10:45, Marcelo Valle 写道:
>
> Hi,
>
> I have a job running on AWS EMR. It's basically a join between 2 tables
> (parquet files on s3), one somehow large (around 50 gb) and other small
> (less than 1gb).
> The small table is the result of other operations, but it was a dataframe
> with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the count on this
> dataframe finishes quickly.
> When I run my "LEFT_ANTI" join, I get the execution plan down bellow.
> While most of my jobs on larges amount of data take max 1 h on this
> cluster, this one takes almost 1 day to complete.
>
> What could I be doing wrong? I am trying to analyze the plan, but I can't
> find anything that justify the slowness. It has 2 shuffles followed by a
> zip, but other jobs have similar things and they are not that slow.
>
> Could anyone point me to possible actions I could take to investigate this?
>
> Thanks,
> Marcelo.
>
> == Physical Plan ==
> *(2) Project [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702,
> USAGE_AGGREGATED_METADATA_HASH#1513]
> +- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#1499, )],
> [coalesce(USAGE_AGGREGATED_METADATA_ID#356, ),
> coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight,
> ((USAGE_AGGREGATED_METADATA_ID#356 <=> USAGE_AGGREGATED_METADATA_ID#1493)
> && (SENDER_RECORDING_IDENTIFIER#357 <=> SENDER_RECORDING_IDENTIFIER#1499))
>:- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493,
> SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
>: +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493,
> ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, RECORDING_DISPLAY_ARTIST#1497,
> WORK_WRITERS#1498, SENDER_RECORDING_IDENTIFIER#1499,
> RECORDING_VERSION_TITLE#1500, WORK_TITLE#1501, CONTENT_TYPE#1502,
> USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 replicas)
>:   +- *(2) Project [ID#328 AS
> USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS
> ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS
> RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS WORK_WRITERS#1498,
> uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null AS
> RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS
> CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null,
> artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS
> USAGE_AGGREGATED_METADATA_HASH#1513]
>:  +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ),
> coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ),
> coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], Inner,
> BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> iswc_1#1420)) &&
> (track_name#291 <=> track_name_1#1421)) && (artist_name#292 <=>
> artist_name_1#1422)) && (work_writer_names#293 <=>
> work_writer_names_1#1423))
>: :- BroadcastExchange
> HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ),
> coalesce(input[2, string, true], ), coalesce(input[3, string, true], ),
> coalesce(input[4, string, true], ), coalesce(input[5, string, true], )))
>: :  +- *(1) Project [ID#328, isrc#289 AS isrc_1#1419,
> iswc#290 AS iswc_1#1420, track_name#291 AS track_name_1#1421,
> artist_name#292 AS artist_name_1#1422, work_writer_names#293 AS
> work_writer_names_1#1423]
>: : +- *(1) Filter isnotnull(ID#328)
>: :+- InMemoryTableScan [ID#328,
> artist_name#292, isrc#289, iswc#290, track_name#291,
> work_writer_names#293], [isnotnull(ID#328)]
>: :  +- InMemoryRelation [ID#328, isrc#289,
> iswc#290, track_name#291, artist_name#292, work_writer_names#293],
> StorageLevel(disk, memory, 1 replicas)
>: :+- *(2) Project [ID#328,
> isrc#289, iswc#290, track_name#291, artist_name#292, work_writer_names#293]
>: :   +- *(2) BroadcastHashJoin
> [coalesce(ISRC#329, ), coalesce(ISWC#330, ), coalesce(RECORDING_TITLE#331,
> ), coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, )],
> [coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, ),
> coalesce(substring(artist_name#292, 0, 1000), ),
> coalesce(work_writer_names#293, )], RightOuter, BuildLeft, (isrc#289
> <=> ISRC#329) && 

Re: help understanding physical plan

2019-08-15 Thread Tianlang

Hi,

Maybe you can look at the spark ui. The physical plan has no time 
consuming information.


在 2019/8/13 下午10:45, Marcelo Valle 写道:

Hi,

I have a job running on AWS EMR. It's basically a join between 2 
tables (parquet files on s3), one somehow large (around 50 gb) and 
other small (less than 1gb).
The small table is the result of other operations, but it was a 
dataframe with `.persist(StorageLevel.MEMORY_AND_DISK_SER)` and the 
count on this dataframe finishes quickly.
When I run my "LEFT_ANTI" join, I get the execution plan down bellow. 
While most of my jobs on larges amount of data take max 1 h on this 
cluster, this one takes almost 1 day to complete.


What could I be doing wrong? I am trying to analyze the plan, but I 
can't find anything that justify the slowness. It has 2 shuffles 
followed by a zip, but other jobs have similar things and they are not 
that slow.


Could anyone point me to possible actions I could take to investigate 
this?


Thanks,
Marcelo.

== Physical Plan ==
*(2) Project [USAGE_AGGREGATED_METADATA_ID#1493, 
SENDER_RECORDING_IDENTIFIER#1499, AIP127258 AS SENDER_IP_ID#1702, 
USAGE_AGGREGATED_METADATA_HASH#1513]
+- *(2) BroadcastHashJoin [coalesce(USAGE_AGGREGATED_METADATA_ID#1493, 
), coalesce(SENDER_RECORDING_IDENTIFIER#1499, )], 
[coalesce(USAGE_AGGREGATED_METADATA_ID#356, ), 
coalesce(SENDER_RECORDING_IDENTIFIER#357, )], LeftAnti, BuildRight, 
((USAGE_AGGREGATED_METADATA_ID#356 <=> 
USAGE_AGGREGATED_METADATA_ID#1493) && (SENDER_RECORDING_IDENTIFIER#357 
<=> SENDER_RECORDING_IDENTIFIER#1499))
   :- InMemoryTableScan [USAGE_AGGREGATED_METADATA_ID#1493, 
SENDER_RECORDING_IDENTIFIER#1499, USAGE_AGGREGATED_METADATA_HASH#1513]
   :     +- InMemoryRelation [USAGE_AGGREGATED_METADATA_ID#1493, 
ISRC#1494, ISWC#1495, RECORDING_TITLE#1496, 
RECORDING_DISPLAY_ARTIST#1497, WORK_WRITERS#1498, 
SENDER_RECORDING_IDENTIFIER#1499, RECORDING_VERSION_TITLE#1500, 
WORK_TITLE#1501, CONTENT_TYPE#1502, 
USAGE_AGGREGATED_METADATA_HASH#1513], StorageLevel(disk, memory, 1 
replicas)
   :           +- *(2) Project [ID#328 AS 
USAGE_AGGREGATED_METADATA_ID#1493, isrc#289 AS ISRC#1494, iswc#290 AS 
ISWC#1495, track_name#291 AS RECORDING_TITLE#1496, artist_name#292 AS 
RECORDING_DISPLAY_ARTIST#1497, work_writer_names#293 AS 
WORK_WRITERS#1498, uri#286 AS SENDER_RECORDING_IDENTIFIER#1499, null 
AS RECORDING_VERSION_TITLE#1500, null AS WORK_TITLE#1501, SOUND AS 
CONTENT_TYPE#1502, UDF(array(isrc#289, track_name#291, null, 
artist_name#292, iswc#290, null, work_writer_names#293, SOUND)) AS 
USAGE_AGGREGATED_METADATA_HASH#1513]
   :              +- *(2) BroadcastHashJoin [coalesce(isrc_1#1419, ), 
coalesce(iswc_1#1420, ), coalesce(track_name_1#1421, ), 
coalesce(artist_name_1#1422, ), coalesce(work_writer_names_1#1423, )], 
[coalesce(isrc#289, ), coalesce(iswc#290, ), coalesce(track_name#291, 
), coalesce(artist_name#292, ), coalesce(work_writer_names#293, )], 
Inner, BuildLeft, (isrc#289 <=> isrc_1#1419) && (iswc#290 <=> 
iswc_1#1420)) && (track_name#291 <=> track_name_1#1421)) && 
(artist_name#292 <=> artist_name_1#1422)) && (work_writer_names#293 
<=> work_writer_names_1#1423))
   :                 :- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
coalesce(input[2, string, true], ), coalesce(input[3, string, true], 
), coalesce(input[4, string, true], ), coalesce(input[5, string, 
true], )))
   :                 :  +- *(1) Project [ID#328, isrc#289 AS 
isrc_1#1419, iswc#290 AS iswc_1#1420, track_name#291 AS 
track_name_1#1421, artist_name#292 AS artist_name_1#1422, 
work_writer_names#293 AS work_writer_names_1#1423]

   :                 :     +- *(1) Filter isnotnull(ID#328)
   :                 :        +- InMemoryTableScan [ID#328, 
artist_name#292, isrc#289, iswc#290, track_name#291, 
work_writer_names#293], [isnotnull(ID#328)]
   :                 :              +- InMemoryRelation [ID#328, 
isrc#289, iswc#290, track_name#291, artist_name#292, 
work_writer_names#293], StorageLevel(disk, memory, 1 replicas)
   :                 :                    +- *(2) Project [ID#328, 
isrc#289, iswc#290, track_name#291, artist_name#292, 
work_writer_names#293]
   :                 :                       +- *(2) BroadcastHashJoin 
[coalesce(ISRC#329, ), coalesce(ISWC#330, ), 
coalesce(RECORDING_TITLE#331, ), 
coalesce(RECORDING_DISPLAY_ARTIST#332, ), coalesce(WORK_WRITERS#333, 
)], [coalesce(isrc#289, ), coalesce(iswc#290, ), 
coalesce(track_name#291, ), coalesce(substring(artist_name#292, 0, 
1000), ), coalesce(work_writer_names#293, )], RightOuter, BuildLeft, 
(isrc#289 <=> ISRC#329) && (iswc#290 <=> ISWC#330)) && 
(track_name#291 <=> RECORDING_TITLE#331)) && 
(substring(artist_name#292, 0, 1000) <=> 
RECORDING_DISPLAY_ARTIST#332)) && (work_writer_names#293 <=> 
WORK_WRITERS#333))
   :                 :                          :- BroadcastExchange 
HashedRelationBroadcastMode(List(coalesce(input[1, string, true], ), 
coalesce(input[2, string, true], ), 

Re: Help: What's the biggest length of SQL that's supported in SparkSQL?

2019-07-12 Thread Reynold Xin
No sorry I'm not at liberty to share other people's code.

On Fri, Jul 12, 2019 at 9:33 AM, Gourav Sengupta < gourav.sengu...@gmail.com > 
wrote:

> 
> Hi Reynold,
> 
> 
> I am genuinely curious about queries which are more than 1 MB and am
> stunned by tens of MB's. Any samples to share :) 
> 
> 
> Regards,
> Gourav
> 
> On Thu, Jul 11, 2019 at 5:03 PM Reynold Xin < rxin@ databricks. com (
> r...@databricks.com ) > wrote:
> 
> 
>> There is no explicit limit but a JVM string cannot be bigger than 2G. It
>> will also at some point run out of memory with too big of a query plan
>> tree or become incredibly slow due to query planning complexity. I've seen
>> queries that are tens of MBs in size.
>> 
>> 
>> 
>> 
>> 
>> 
>> On Thu, Jul 11, 2019 at 5:01 AM, 李书明 < alemmontree@ 126. com (
>> alemmont...@126.com ) > wrote:
>> 
>>> I have a question about the limit(biggest) of SQL's length that is
>>> supported in SparkSQL. I can't find the answer in the documents of Spark.
>>> 
>>> 
>>> Maybe Interger.MAX_VALUE or not ?
>>> 
>> 
>> 
> 
>

Re: Help: What's the biggest length of SQL that's supported in SparkSQL?

2019-07-12 Thread Gourav Sengupta
Hi Reynold,

I am genuinely curious about queries which are more than 1 MB and am
stunned by tens of MB's. Any samples to share :)

Regards,
Gourav

On Thu, Jul 11, 2019 at 5:03 PM Reynold Xin  wrote:

> There is no explicit limit but a JVM string cannot be bigger than 2G. It
> will also at some point run out of memory with too big of a query plan tree
> or become incredibly slow due to query planning complexity. I've seen
> queries that are tens of MBs in size.
>
>
>
> On Thu, Jul 11, 2019 at 5:01 AM, 李书明  wrote:
>
>> I have a question about the limit(biggest) of SQL's length that is
>> supported in SparkSQL. I can't find the answer in the documents of Spark.
>>
>> Maybe Interger.MAX_VALUE or not ?
>>
>>
>


Re: Help: What's the biggest length of SQL that's supported in SparkSQL?

2019-07-11 Thread Reynold Xin
There is no explicit limit but a JVM string cannot be bigger than 2G. It will 
also at some point run out of memory with too big of a query plan tree or 
become incredibly slow due to query planning complexity. I've seen queries that 
are tens of MBs in size.

On Thu, Jul 11, 2019 at 5:01 AM, 李书明 < alemmont...@126.com > wrote:

> 
> I have a question about the limit(biggest) of SQL's length that is
> supported in SparkSQL. I can't find the answer in the documents of Spark.
> 
> 
> Maybe Interger.MAX_VALUE or not ?
> 
> 
> 
>

Re: [HELP WANTED] Apache Zipkin (incubating) needs Spark gurus

2019-03-21 Thread Reynold Xin
Are there specific questions you have? Might be easier to post them here
also.

On Wed, Mar 20, 2019 at 5:16 PM Andriy Redko  wrote:

> Hello Dear Spark Community!
>
> The hyper-popularity of the Apache Spark made it a de-facto choice for many
> projects which need some sort of data processing capabilities. One of
> those is
> Zipkin, currenly incubating at Apache [1], the widespread distributed
> tracing framework.
> The small amazing team behind the project maintains around ~40 different
> integrations
> and components, including the [2], a set of Spark jobs to reconstruct over
> time the
> service dependency graphs from the collected traces. The current
> maintainers are not
> yet savvy on Spark and the team really struggles to address the ongoing
> ussues and
> answer user questions. For example, users are reporting concerns about job
> distribution
> which the Zipkin team doesn't know how to answer. It is really difficult
> to keep this
> particular component up and running due to the lack of the Spark
> expertise.
>
> Thereby this message to the community, anyone could be / is interested in
> distributed
> tracing (fascinating development by itself!) to the point to step in, help
> with Spark
> expertise and contribute? Please feel free to reach out, Gitter @ [3]
> or d...@zipkin.apache.org!
>
> [1] http://incubator.apache.org/projects/zipkin.html
> [2] https://github.com/openzipkin/zipkin-dependencies
> [3] https://gitter.im/openzipkin/zipkin
>
> Best Regards,
> Apache Zipkin (incubating) Team
>
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-20 Thread Kazuaki Ishizaki
If it is difficult to create the small stand alone program, another 
approach seems to attach everything (i.e. configuration, data, program, 
console output, log, history server data, etc.)
As a log, the community would recommend the info log with 
"spark.sql.codegen.logging.maxLines=2147483647". The log has to include 
the all of the generated Java methods.

The community may take more time to address this problem than the case 
with the small program.

Best Regards,
Kazuaki Ishizaki



From:   Aakash Basu 
To: Kazuaki Ishizaki 
Cc: vaquar khan , Eyal Zituny 
, user 
Date:   2018/06/21 01:29
Subject:    Re: [Help] Codegen Stage grows beyond 64 KB



Hi Kazuaki,

It would be really difficult to produce a small S-A code to reproduce this 
problem because, I'm running through a big pipeline of feature engineering 
where I derive a lot of variables based on the present ones to kind of 
explode the size of the table by many folds. Then, when I do any kind of 
join, this error shoots up.

I tried with wholeStage.codegen=false, but that errors out the entire 
program rather than running it with a lesser optimized code.

Any suggestion on how I can proceed towards a JIRA entry for this?

Thanks,
Aakash.

On Wed, Jun 20, 2018 at 9:41 PM, Kazuaki Ishizaki  
wrote:
Spark 2.3 tried to split a large generated Java methods into small methods 
as possible. However, this report may remain places that generates a large 
method.

Would it be possible to create a JIRA entry with a small stand alone 
program that can reproduce this problem? It would be very helpful that the 
community will address this problem.

Best regards,
Kazuaki Ishizaki



From:vaquar khan 
To:Eyal Zituny 
Cc:Aakash Basu , user <
user@spark.apache.org>
Date:2018/06/18 01:57
Subject:    Re: [Help] Codegen Stage grows beyond 64 KB




Totally agreed with Eyal .

The problem is that when Java programs generated using Catalyst from 
programs using DataFrame and Dataset are compiled into Java bytecode, the 
size of byte code of one method must not be 64 KB or more, This conflicts 
with the limitation of the Java class file, which is an exception that 
occurs. 

In order to avoid occurrence of an exception due to this restriction, 
within Spark, a solution is to split the methods that compile and make 
Java bytecode that is likely to be over 64 KB into multiple methods when 
Catalyst generates Java programs It has been done.

Use persist or any other logical separation in pipeline.

Regards,
Vaquar khan 

On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny  
wrote:
Hi Akash,
such errors might appear in large spark pipelines, the root cause is a 
64kb jvm limitation.
the reason that your job isn't failing at the end is due to spark fallback 
- if code gen is failing, spark compiler will try to create the flow 
without the code gen (less optimized)
if you do not want to see this error, you can either disable code gen 
using the flag:  spark.sql.codegen.wholeStage= "false"
or you can try to split your complex pipeline into several spark flows if 
possible

hope that helps

Eyal

On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu  
wrote:
Hi,

I already went through it, that's one use case. I've a complex and very 
big pipeline of multiple jobs under one spark session. Not getting, on how 
to solve this, as it is happening over Logistic Regression and Random 
Forest models, which I'm just using from Spark ML package rather than 
doing anything by myself.

Thanks,
Aakash.

On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:
Hi Akash,

Please check stackoverflow.

https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe


Regards,
Vaquar khan

On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu  
wrote:
Hi guys,

I'm getting an error when I'm feature engineering on 30+ columns to create 
about 200+ columns. It is not failing the job, but the ERROR shows. I want 
to know how can I avoid this.

Spark - 2.3.1
Python - 3.6

Cluster Config -
1 Master - 32 GB RAM, 16 Cores
4 Slaves - 16 GB RAM, 8 Cores


Input data - 8 partitions of parquet file with snappy compression.

My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
--num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 
5 --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf 
spark.driver.maxResultSize=2G --conf 
"spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf 
spark.scheduler.listenerbus.eventqueue.capacity=2 --conf 
spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py 
> /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt


Stack-Trace below -

ERROR CodeGenerator:91 - failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": 
Code of method "processNext()V" of class 
"org.apache.spark.sql.catalyst.expressions.Genera

Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-20 Thread Aakash Basu
Hi Kazuaki,

It would be really difficult to produce a small S-A code to reproduce this
problem because, I'm running through a big pipeline of feature engineering
where I derive a lot of variables based on the present ones to kind of
explode the size of the table by many folds. Then, when I do any kind of
join, this error shoots up.

I tried with wholeStage.codegen=false, but that errors out the entire
program rather than running it with a lesser optimized code.

Any suggestion on how I can proceed towards a JIRA entry for this?

Thanks,
Aakash.

On Wed, Jun 20, 2018 at 9:41 PM, Kazuaki Ishizaki 
wrote:

> Spark 2.3 tried to split a large generated Java methods into small methods
> as possible. However, this report may remain places that generates a large
> method.
>
> Would it be possible to create a JIRA entry with a small stand alone
> program that can reproduce this problem? It would be very helpful that the
> community will address this problem.
>
> Best regards,
> Kazuaki Ishizaki
>
>
>
> From:vaquar khan 
> To:Eyal Zituny 
> Cc:Aakash Basu , user <
> user@spark.apache.org>
> Date:2018/06/18 01:57
> Subject:Re: [Help] Codegen Stage grows beyond 64 KB
> --
>
>
>
> Totally agreed with Eyal .
>
> The problem is that when Java programs generated using Catalyst from
> programs using DataFrame and Dataset are compiled into Java bytecode, the
> size of byte code of one method must not be 64 KB or more, This conflicts
> with the limitation of the Java class file, which is an exception that
> occurs.
>
> In order to avoid occurrence of an exception due to this restriction,
> within Spark, a solution is to split the methods that compile and make Java
> bytecode that is likely to be over 64 KB into multiple methods when
> Catalyst generates Java programs It has been done.
>
> Use persist or any other logical separation in pipeline.
>
> Regards,
> Vaquar khan
>
> On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny <*eyal.zit...@equalum.io*
> > wrote:
> Hi Akash,
> such errors might appear in large spark pipelines, the root cause is a
> 64kb jvm limitation.
> the reason that your job isn't failing at the end is due to spark fallback
> - if code gen is failing, spark compiler will try to create the flow
> without the code gen (less optimized)
> if you do not want to see this error, you can either disable code gen
> using the flag:  spark.sql.codegen.wholeStage= "false"
> or you can try to split your complex pipeline into several spark flows if
> possible
>
> hope that helps
>
> Eyal
>
> On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu <*aakash.spark@gmail.com*
> > wrote:
> Hi,
>
> I already went through it, that's one use case. I've a complex and very
> big pipeline of multiple jobs under one spark session. Not getting, on how
> to solve this, as it is happening over Logistic Regression and Random
> Forest models, which I'm just using from Spark ML package rather than doing
> anything by myself.
>
> Thanks,
> Aakash.
>
> On Sun 17 Jun, 2018, 8:21 AM vaquar khan, <*vaquar.k...@gmail.com*
> > wrote:
> Hi Akash,
>
> Please check stackoverflow.
>
>
> *https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe*
> <https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe>
>
> Regards,
> Vaquar khan
>
> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu <*aakash.spark@gmail.com*
> > wrote:
> Hi guys,
>
> I'm getting an error when I'm feature engineering on 30+ columns to create
> about 200+ columns. It is not failing the job, but the ERROR shows. I want
> to know how can I avoid this.
>
> Spark - 2.3.1
> Python - 3.6
>
> Cluster Config -
> 1 Master - 32 GB RAM, 16 Cores
> 4 Slaves - 16 GB RAM, 8 Cores
>
>
> Input data - 8 partitions of parquet file with snappy compression.
>
> My Spark-Submit -> spark-submit --master spark://*192.168.60.20:7077*
> <http://192.168.60.20:7077>--num-executors 4 --executor-cores 5
> --executor-memory 10G --driver-cores 5 --driver-memory 25G --conf
> spark.sql.shuffle.partitions=60 --conf spark.driver.maxResultSize=2G
> --conf "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
> spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
> spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py
> > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>
>
> Stack-Trace below -
>
> ERROR CodeGenerator:91 - failed to compile: 
> o

Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-20 Thread Kazuaki Ishizaki
Spark 2.3 tried to split a large generated Java methods into small methods 
as possible. However, this report may remain places that generates a large 
method.

Would it be possible to create a JIRA entry with a small stand alone 
program that can reproduce this problem? It would be very helpful that the 
community will address this problem.

Best regards,
Kazuaki Ishizaki



From:   vaquar khan 
To: Eyal Zituny 
Cc: Aakash Basu , user 

Date:   2018/06/18 01:57
Subject:Re: [Help] Codegen Stage grows beyond 64 KB



Totally agreed with Eyal .

The problem is that when Java programs generated using Catalyst from 
programs using DataFrame and Dataset are compiled into Java bytecode, the 
size of byte code of one method must not be 64 KB or more, This conflicts 
with the limitation of the Java class file, which is an exception that 
occurs. 

In order to avoid occurrence of an exception due to this restriction, 
within Spark, a solution is to split the methods that compile and make 
Java bytecode that is likely to be over 64 KB into multiple methods when 
Catalyst generates Java programs It has been done.

Use persist or any other logical separation in pipeline.

Regards,
Vaquar khan 

On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny  
wrote:
Hi Akash,
such errors might appear in large spark pipelines, the root cause is a 
64kb jvm limitation.
the reason that your job isn't failing at the end is due to spark fallback 
- if code gen is failing, spark compiler will try to create the flow 
without the code gen (less optimized)
if you do not want to see this error, you can either disable code gen 
using the flag:  spark.sql.codegen.wholeStage= "false"
or you can try to split your complex pipeline into several spark flows if 
possible

hope that helps

Eyal

On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu  
wrote:
Hi,

I already went through it, that's one use case. I've a complex and very 
big pipeline of multiple jobs under one spark session. Not getting, on how 
to solve this, as it is happening over Logistic Regression and Random 
Forest models, which I'm just using from Spark ML package rather than 
doing anything by myself.

Thanks,
Aakash.

On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:
Hi Akash,

Please check stackoverflow.

https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe

Regards,
Vaquar khan

On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu  
wrote:
Hi guys,

I'm getting an error when I'm feature engineering on 30+ columns to create 
about 200+ columns. It is not failing the job, but the ERROR shows. I want 
to know how can I avoid this.

Spark - 2.3.1
Python - 3.6

Cluster Config -
1 Master - 32 GB RAM, 16 Cores
4 Slaves - 16 GB RAM, 8 Cores


Input data - 8 partitions of parquet file with snappy compression.

My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077 
--num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 
5 --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf 
spark.driver.maxResultSize=2G --conf 
"spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf 
spark.scheduler.listenerbus.eventqueue.capacity=2 --conf 
spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py 
> /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt

Stack-Trace below -

ERROR CodeGenerator:91 - failed to compile: 
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": 
Code of method "processNext()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
 
grows beyond 64 KB
org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass": 
Code of method "processNext()V" of class 
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
 
grows beyond 64 KB
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
at 
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
at 
org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
at 
org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
at 
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
at 
org.spark_project.guava.cache.LocalCache$LoadingValueRefer

Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-17 Thread vaquar khan
Totally agreed with Eyal .

The problem is that when Java programs generated using Catalyst from
programs using DataFrame and Dataset are compiled into Java bytecode, the
size of byte code of one method must not be 64 KB or more, This conflicts
with the limitation of the Java class file, which is an exception that
occurs.

In order to avoid occurrence of an exception due to this restriction,
within Spark, a solution is to split the methods that compile and make Java
bytecode that is likely to be over 64 KB into multiple methods when
Catalyst generates Java programs It has been done.

Use persist or any other logical separation in pipeline.

Regards,
Vaquar khan

On Sun, Jun 17, 2018 at 5:25 AM, Eyal Zituny  wrote:

> Hi Akash,
> such errors might appear in large spark pipelines, the root cause is a
> 64kb jvm limitation.
> the reason that your job isn't failing at the end is due to spark fallback
> - if code gen is failing, spark compiler will try to create the flow
> without the code gen (less optimized)
> if you do not want to see this error, you can either disable code gen
> using the flag:  spark.sql.codegen.wholeStage= "false"
> or you can try to split your complex pipeline into several spark flows if
> possible
>
> hope that helps
>
> Eyal
>
> On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu 
> wrote:
>
>> Hi,
>>
>> I already went through it, that's one use case. I've a complex and very
>> big pipeline of multiple jobs under one spark session. Not getting, on how
>> to solve this, as it is happening over Logistic Regression and Random
>> Forest models, which I'm just using from Spark ML package rather than doing
>> anything by myself.
>>
>> Thanks,
>> Aakash.
>>
>> On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:
>>
>>> Hi Akash,
>>>
>>> Please check stackoverflow.
>>>
>>> https://stackoverflow.com/questions/41098953/codegen-grows-
>>> beyond-64-kb-error-when-normalizing-large-pyspark-dataframe
>>>
>>> Regards,
>>> Vaquar khan
>>>
>>> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu >> > wrote:
>>>
 Hi guys,

 I'm getting an error when I'm feature engineering on 30+ columns to
 create about 200+ columns. It is not failing the job, but the ERROR shows.
 I want to know how can I avoid this.

 Spark - 2.3.1
 Python - 3.6

 Cluster Config -
 1 Master - 32 GB RAM, 16 Cores
 4 Slaves - 16 GB RAM, 8 Cores


 Input data - 8 partitions of parquet file with snappy compression.

 My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
 --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
 --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
 spark.driver.maxResultSize=2G --conf 
 "spark.executor.extraJavaOptions=-XX:+UseParallelGC"
 --conf spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
 spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py
 > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_
 33_col.txt

 Stack-Trace below -

 ERROR CodeGenerator:91 - failed to compile:
> org.codehaus.janino.InternalCompilerException: Compiling
> "GeneratedClass": Code of method "processNext()V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$Ge
> neratedIteratorForCodegenStage3426" grows beyond 64 KB
> org.codehaus.janino.InternalCompilerException: Compiling
> "GeneratedClass": Code of method "processNext()V" of class
> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$Ge
> neratedIteratorForCodegenStage3426" grows beyond 64 KB
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.
> java:361)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:
> 234)
> at org.codehaus.janino.SimpleCompiler.compileToClassLoader(Simp
> leCompiler.java:446)
> at org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassB
> odyEvaluator.java:313)
> at org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluat
> or.java:235)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:
> 204)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor$.org$apache$spark$sql$catalyst$expressions$codegen$C
> odeGenerator$$doCompile(CodeGenerator.scala:1417)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor$$anon$1.load(CodeGenerator.scala:1493)
> at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenera
> tor$$anon$1.load(CodeGenerator.scala:1490)
> at org.spark_project.guava.cache.LocalCache$LoadingValueReferen
> ce.loadFuture(LocalCache.java:3599)
> at org.spark_project.guava.cache.LocalCache$Segment.loadSync(Lo
> calCache.java:2379)
> at org.spark_project.guava.cache.LocalCache$Segment.lockedGetOr
> 

Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-17 Thread Eyal Zituny
Hi Akash,
such errors might appear in large spark pipelines, the root cause is a 64kb
jvm limitation.
the reason that your job isn't failing at the end is due to spark fallback
- if code gen is failing, spark compiler will try to create the flow
without the code gen (less optimized)
if you do not want to see this error, you can either disable code gen using
the flag:  spark.sql.codegen.wholeStage= "false"
or you can try to split your complex pipeline into several spark flows if
possible

hope that helps

Eyal

On Sun, Jun 17, 2018 at 8:16 AM, Aakash Basu 
wrote:

> Hi,
>
> I already went through it, that's one use case. I've a complex and very
> big pipeline of multiple jobs under one spark session. Not getting, on how
> to solve this, as it is happening over Logistic Regression and Random
> Forest models, which I'm just using from Spark ML package rather than doing
> anything by myself.
>
> Thanks,
> Aakash.
>
> On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:
>
>> Hi Akash,
>>
>> Please check stackoverflow.
>>
>> https://stackoverflow.com/questions/41098953/codegen-
>> grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe
>>
>> Regards,
>> Vaquar khan
>>
>> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu 
>> wrote:
>>
>>> Hi guys,
>>>
>>> I'm getting an error when I'm feature engineering on 30+ columns to
>>> create about 200+ columns. It is not failing the job, but the ERROR shows.
>>> I want to know how can I avoid this.
>>>
>>> Spark - 2.3.1
>>> Python - 3.6
>>>
>>> Cluster Config -
>>> 1 Master - 32 GB RAM, 16 Cores
>>> 4 Slaves - 16 GB RAM, 8 Cores
>>>
>>>
>>> Input data - 8 partitions of parquet file with snappy compression.
>>>
>>> My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
>>> --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
>>> --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
>>> spark.driver.maxResultSize=2G --conf "spark.executor.
>>> extraJavaOptions=-XX:+UseParallelGC" --conf 
>>> spark.scheduler.listenerbus.eventqueue.capacity=2
>>> --conf spark.sql.codegen=true 
>>> /appdata/bblite-codebase/pipeline_data_test_run.py
>>> > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>>>
>>> Stack-Trace below -
>>>
>>> ERROR CodeGenerator:91 - failed to compile: 
>>> org.codehaus.janino.InternalCompilerException:
 Compiling "GeneratedClass": Code of method "processNext()V" of class
 "org.apache.spark.sql.catalyst.expressions.GeneratedClass$
 GeneratedIteratorForCodegenStage3426" grows beyond 64 KB
 org.codehaus.janino.InternalCompilerException: Compiling
 "GeneratedClass": Code of method "processNext()V" of class
 "org.apache.spark.sql.catalyst.expressions.GeneratedClass$
 GeneratedIteratorForCodegenStage3426" grows beyond 64 KB
 at org.codehaus.janino.UnitCompiler.compileUnit(
 UnitCompiler.java:361)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
 at org.codehaus.janino.SimpleCompiler.compileToClassLoader(
 SimpleCompiler.java:446)
 at org.codehaus.janino.ClassBodyEvaluator.compileToClass(
 ClassBodyEvaluator.java:313)
 at org.codehaus.janino.ClassBodyEvaluator.cook(
 ClassBodyEvaluator.java:235)
 at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
 at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
 at org.apache.spark.sql.catalyst.expressions.codegen.
 CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
 CodeGenerator$$doCompile(CodeGenerator.scala:1417)
 at org.apache.spark.sql.catalyst.expressions.codegen.
 CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
 at org.apache.spark.sql.catalyst.expressions.codegen.
 CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
 at org.spark_project.guava.cache.LocalCache$LoadingValueReference.
 loadFuture(LocalCache.java:3599)
 at org.spark_project.guava.cache.LocalCache$Segment.loadSync(
 LocalCache.java:2379)
 at org.spark_project.guava.cache.LocalCache$Segment.
 lockedGetOrLoad(LocalCache.java:2342)
 at org.spark_project.guava.cache.LocalCache$Segment.get(
 LocalCache.java:2257)
 at org.spark_project.guava.cache.LocalCache.get(LocalCache.
 java:4000)
 at org.spark_project.guava.cache.LocalCache.getOrLoad(
 LocalCache.java:4004)
 at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.
 get(LocalCache.java:4874)
 at org.apache.spark.sql.catalyst.expressions.codegen.
 CodeGenerator$.compile(CodeGenerator.scala:1365)
 at org.apache.spark.sql.execution.WholeStageCodegenExec.
 liftedTree1$1(WholeStageCodegenExec.scala:579)
 at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(
 WholeStageCodegenExec.scala:578)
 at org.apache.spark.sql.execution.SparkPlan$$anonfun$
 execute$1.apply(SparkPlan.scala:131)
  

Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-16 Thread Aakash Basu
Hi,

I already went through it, that's one use case. I've a complex and very big
pipeline of multiple jobs under one spark session. Not getting, on how to
solve this, as it is happening over Logistic Regression and Random Forest
models, which I'm just using from Spark ML package rather than doing
anything by myself.

Thanks,
Aakash.

On Sun 17 Jun, 2018, 8:21 AM vaquar khan,  wrote:

> Hi Akash,
>
> Please check stackoverflow.
>
>
> https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe
>
> Regards,
> Vaquar khan
>
> On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu 
> wrote:
>
>> Hi guys,
>>
>> I'm getting an error when I'm feature engineering on 30+ columns to
>> create about 200+ columns. It is not failing the job, but the ERROR shows.
>> I want to know how can I avoid this.
>>
>> Spark - 2.3.1
>> Python - 3.6
>>
>> Cluster Config -
>> 1 Master - 32 GB RAM, 16 Cores
>> 4 Slaves - 16 GB RAM, 8 Cores
>>
>>
>> Input data - 8 partitions of parquet file with snappy compression.
>>
>> My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
>> --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
>> --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
>> spark.driver.maxResultSize=2G --conf
>> "spark.executor.extraJavaOptions=-XX:+UseParallelGC" --conf
>> spark.scheduler.listenerbus.eventqueue.capacity=2 --conf
>> spark.sql.codegen=true /appdata/bblite-codebase/pipeline_data_test_run.py >
>> /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>>
>> Stack-Trace below -
>>
>> ERROR CodeGenerator:91 - failed to compile:
>>> org.codehaus.janino.InternalCompilerException: Compiling "GeneratedClass":
>>> Code of method "processNext()V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
>>> grows beyond 64 KB
>>> org.codehaus.janino.InternalCompilerException: Compiling
>>> "GeneratedClass": Code of method "processNext()V" of class
>>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3426"
>>> grows beyond 64 KB
>>> at
>>> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
>>> at
>>> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
>>> at
>>> org.codehaus.janino.ClassBodyEvaluator.compileToClass(ClassBodyEvaluator.java:313)
>>> at
>>> org.codehaus.janino.ClassBodyEvaluator.cook(ClassBodyEvaluator.java:235)
>>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
>>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:1417)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
>>> at
>>> org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)
>>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>>> at
>>> org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
>>> at
>>> org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
>>> at
>>> org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1365)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.liftedTree1$1(WholeStageCodegenExec.scala:579)
>>> at
>>> org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:578)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
>>> at
>>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
>>> at
>>> org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
>>> at
>>> org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>>> at
>>> 

Re: [Help] Codegen Stage grows beyond 64 KB

2018-06-16 Thread vaquar khan
Hi Akash,

Please check stackoverflow.

https://stackoverflow.com/questions/41098953/codegen-grows-beyond-64-kb-error-when-normalizing-large-pyspark-dataframe

Regards,
Vaquar khan

On Sat, Jun 16, 2018 at 3:27 PM, Aakash Basu 
wrote:

> Hi guys,
>
> I'm getting an error when I'm feature engineering on 30+ columns to create
> about 200+ columns. It is not failing the job, but the ERROR shows. I want
> to know how can I avoid this.
>
> Spark - 2.3.1
> Python - 3.6
>
> Cluster Config -
> 1 Master - 32 GB RAM, 16 Cores
> 4 Slaves - 16 GB RAM, 8 Cores
>
>
> Input data - 8 partitions of parquet file with snappy compression.
>
> My Spark-Submit -> spark-submit --master spark://192.168.60.20:7077
> --num-executors 4 --executor-cores 5 --executor-memory 10G --driver-cores 5
> --driver-memory 25G --conf spark.sql.shuffle.partitions=60 --conf
> spark.driver.maxResultSize=2G --conf "spark.executor.
> extraJavaOptions=-XX:+UseParallelGC" --conf 
> spark.scheduler.listenerbus.eventqueue.capacity=2
> --conf spark.sql.codegen=true 
> /appdata/bblite-codebase/pipeline_data_test_run.py
> > /appdata/bblite-data/logs/log_10_iter_pipeline_8_partitions_33_col.txt
>
> Stack-Trace below -
>
> ERROR CodeGenerator:91 - failed to compile: 
> org.codehaus.janino.InternalCompilerException:
>> Compiling "GeneratedClass": Code of method "processNext()V" of class
>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$
>> GeneratedIteratorForCodegenStage3426" grows beyond 64 KB
>> org.codehaus.janino.InternalCompilerException: Compiling
>> "GeneratedClass": Code of method "processNext()V" of class
>> "org.apache.spark.sql.catalyst.expressions.GeneratedClass$
>> GeneratedIteratorForCodegenStage3426" grows beyond 64 KB
>> at org.codehaus.janino.UnitCompiler.compileUnit(
>> UnitCompiler.java:361)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
>> at org.codehaus.janino.SimpleCompiler.compileToClassLoader(
>> SimpleCompiler.java:446)
>> at org.codehaus.janino.ClassBodyEvaluator.compileToClass(
>> ClassBodyEvaluator.java:313)
>> at org.codehaus.janino.ClassBodyEvaluator.cook(
>> ClassBodyEvaluator.java:235)
>> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
>> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$
>> CodeGenerator$$doCompile(CodeGenerator.scala:1417)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$$anon$1.load(CodeGenerator.scala:1493)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$$anon$1.load(CodeGenerator.scala:1490)
>> at org.spark_project.guava.cache.LocalCache$LoadingValueReference.
>> loadFuture(LocalCache.java:3599)
>> at org.spark_project.guava.cache.LocalCache$Segment.loadSync(
>> LocalCache.java:2379)
>> at org.spark_project.guava.cache.LocalCache$Segment.
>> lockedGetOrLoad(LocalCache.java:2342)
>> at org.spark_project.guava.cache.LocalCache$Segment.get(
>> LocalCache.java:2257)
>> at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
>> at org.spark_project.guava.cache.LocalCache.getOrLoad(
>> LocalCache.java:4004)
>> at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.
>> get(LocalCache.java:4874)
>> at org.apache.spark.sql.catalyst.expressions.codegen.
>> CodeGenerator$.compile(CodeGenerator.scala:1365)
>> at org.apache.spark.sql.execution.WholeStageCodegenExec.
>> liftedTree1$1(WholeStageCodegenExec.scala:579)
>> at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(
>> WholeStageCodegenExec.scala:578)
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> execute$1.apply(SparkPlan.scala:131)
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> execute$1.apply(SparkPlan.scala:127)
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> executeQuery$1.apply(SparkPlan.scala:155)
>> at org.apache.spark.rdd.RDDOperationScope$.withScope(
>> RDDOperationScope.scala:151)
>> at org.apache.spark.sql.execution.SparkPlan.
>> executeQuery(SparkPlan.scala:152)
>> at org.apache.spark.sql.execution.SparkPlan.execute(
>> SparkPlan.scala:127)
>> at org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.
>> prepareShuffleDependency(ShuffleExchangeExec.scala:92)
>> at org.apache.spark.sql.execution.exchange.
>> ShuffleExchangeExec$$anonfun$doExecute$1.apply(
>> ShuffleExchangeExec.scala:128)
>> at org.apache.spark.sql.execution.exchange.
>> ShuffleExchangeExec$$anonfun$doExecute$1.apply(
>> ShuffleExchangeExec.scala:119)
>> at org.apache.spark.sql.catalyst.errors.package$.attachTree(
>> package.scala:52)
>> at org.apache.spark.sql.execution.exchange.
>> ShuffleExchangeExec.doExecute(ShuffleExchangeExec.scala:119)
>> at org.apache.spark.sql.execution.SparkPlan$$anonfun$
>> 

Re: Help explaining explain() after DataFrame join reordering

2018-06-05 Thread Matteo Cossu
Hello,

as explained here
,
the join order can be changed by the optimizer. The difference introduced
in Spark 2.2 is that the reordering is based on statistics instead of
heuristics, that can appear "random" and for some cases decrease the
performances.
If you want to control more the join order you can define your own Rule, an
example here.


Best,

Matteo


On 1 June 2018 at 18:31, Mohamed Nadjib MAMI 
wrote:

> Dear Sparkers,
>
> I'm loading into DataFrames data from 5 sources (using official
> connectors): Parquet, MongoDB, Cassandra, MySQL and CSV. I'm then joining
> those DataFrames in two different orders.
> - mongo * cassandra * jdbc * parquet * csv (random order).
> - parquet * csv * cassandra * jdbc * mongodb (optimized order).
>
> The first follows a random order, whereas the second I'm deciding based on
> some optimization techniques (can provide details for the interested
> readers or if needed here).
>
> After the evaluation on increasing sizes of data, the optimization
> techniques I developed didn't improve the performance very noticeably. I
> inspected the Logical/Physical plan of the final joined DataFrame (using
> `explain(true)`). The 1st order was respected, whereas the 2nd order, it
> turned out, wasn't respected, and MongoDB was queried first.
>
> However, that what it seemed to me, I'm not quite confident reading the
> Plans (returned using explain(true)). Could someone help explaining the
> `explain(true)` output? (pasted in this gist
> ). Is
> there a way we could enforce the given order?
>
> I'm using Spark 2.1, so I think it doesn't include the new cost-based
> optimizations (introduced in Spark 2.2).
>
> *Regards, Grüße, **Cordialement,** Recuerdos, Saluti, προσρήσεις, 问候,
> تحياتي.*
> *Mohamed Nadjib Mami*
> *Research Associate @ Fraunhofer IAIS - PhD Student @ Bonn University*
> *About me! *
> *LinkedIn *
>


Re: help needed in perforance improvement of spark structured streaming

2018-05-30 Thread amit kumar singh
hi team

any help with this



I have a use case where i need to call stored procedure through structured
streaming.

I am able to send kafka message and call stored procedure ,

but since foreach sink keeps on executing stored procedure per message

i want to combine all the messages in single dtaframe and then call  stored
procedure at once

is it possible to do


current code

select('value cast "string",'topic)
  .select('topic,concat_ws(",", 'value cast "string") as 'value1)
 .groupBy('topic cast "string").count()
.coalesce(1)
.as[String]
.writeStream
.trigger(ProcessingTime("60 seconds"))
.option("checkpointLocation", checkpointUrl)
.foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))







On Sat, May 5, 2018 at 12:20 PM, amit kumar singh 
wrote:

> Hi Community,
>
> I have a use case where i need to call stored procedure through structured
> streaming.
>
> I am able to send kafka message and call stored procedure ,
>
> but since foreach sink keeps on executing stored procedure per message
>
> i want to combine all the messages in single dtaframe and then call
> stored procedure at once
>
> is it possible to do
>
>
> current code
>
> select('value cast "string",'topic)
>   .select('topic,concat_ws(",", 'value cast "string") as 'value1)
>  .groupBy('topic cast "string").count()
> .coalesce(1)
> .as[String]
> .writeStream
> .trigger(ProcessingTime("60 seconds"))
> .option("checkpointLocation", checkpointUrl)
> .foreach(new SimpleSqlServerSink(jdbcUrl, connectionProperties))
>
>
>
>
> thanks
> rohit
>


Re: help with streaming batch interval question needed

2018-05-25 Thread Peter Liu
 Hi Jacek,

This is exact what i'm looking for. Thanks!!

Also thanks for the link. I just noticed that I can unfold the link of
trigger and see the examples in java and scala languages - what a general
help for a new comer :-)
http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

def trigger(trigger: Trigger

): DataStreamWriter

[T]

Set the trigger for the stream query. The default value is ProcessingTime(0)
and it will run the query as fast as possible.

Scala Example:
df.writeStream.trigger(ProcessingTime("10 seconds"))

import scala.concurrent.duration._
df.writeStream.trigger(ProcessingTime(10.seconds))

Java Example:
df.writeStream().trigger(ProcessingTime.create("10 seconds"))

import java.util.concurrent.TimeUnit
df.writeStream().trigger(ProcessingTime.create(10, TimeUnit.SECONDS))

Muchly appreciated!

Peter


On Fri, May 25, 2018 at 9:11 AM, Jacek Laskowski  wrote:

> Hi Peter,
>
> > Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> That's trigger method on DataStreamWriter.
>
> http://spark.apache.org/docs/latest/api/scala/index.html#
> org.apache.spark.sql.streaming.DataStreamWriter
>
> import org.apache.spark.sql.streaming.Trigger
> df.writeStream.trigger(Trigger.ProcessingTime("1 second"))
>
> See http://spark.apache.org/docs/latest/structured-
> streaming-programming-guide.html#triggers
>
> Pozdrawiam,
> Jacek Laskowski
> 
> https://about.me/JacekLaskowski
> Mastering Spark SQL https://bit.ly/mastering-spark-sql
> Spark Structured Streaming https://bit.ly/spark-structured-streaming
> Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
> Follow me at https://twitter.com/jaceklaskowski
>
> On Thu, May 24, 2018 at 10:14 PM, Peter Liu  wrote:
>
>> Hi there,
>>
>> from my apache spark streaming website (see links below),
>>
>>- the batch-interval is set when a spark StreamingContext is
>>constructed (see example (a) quoted below)
>>- the StreamingContext is available in older and new Spark version
>>(v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>>1.6.0/streaming-programming-guide.html
>>
>>and https://spark.apache.org/docs/2.3.0/streaming-programming-gu
>>ide.html )
>>- however, example (b) below  doesn't use StreamingContext, but
>>StreamingSession object to setup a streaming flow;
>>
>> What does the usage difference in (a) and (b) mean? I was wondering if
>> this would mean a different streaming approach ("traditional" streaming vs
>> structured streaming?
>>
>> Basically I need to find a way to set the batch-interval in (b), similar
>> as in (a) below.
>>
>> Would be great if someone can please share some insights here.
>>
>> Thanks!
>>
>> Peter
>>
>> (a)
>> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>>
>> import org.apache.spark._import org.apache.spark.streaming._
>> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= 
>> new StreamingContext(conf, Seconds(1))
>>
>>
>> (b)
>> ( from databricks' https://databricks.com/blog/20
>> 17/04/26/processing-data-in-apache-kafka-with-structured-str
>> eaming-in-apache-spark-2-2.html)
>>
>>val *spark *= SparkSession.builder()
>> .appName(appName)
>>   .getOrCreate()
>> ...
>>
>> jsonOptions = { "timestampFormat": nestTimestampFormat }
>> parsed = *spark *\
>>   .readStream \
>>   .format("kafka") \
>>   .option("kafka.bootstrap.servers", "localhost:9092") \
>>   .option("subscribe", "nest-logs") \
>>   .load() \
>>   .select(from_json(col("value").cast("string"), schema, 
>> jsonOptions).alias("parsed_value"))
>>
>>
>>
>>
>>
>


Re: help with streaming batch interval question needed

2018-05-25 Thread Jacek Laskowski
Hi Peter,

> Basically I need to find a way to set the batch-interval in (b), similar
as in (a) below.

That's trigger method on DataStreamWriter.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.streaming.DataStreamWriter

import org.apache.spark.sql.streaming.Trigger
df.writeStream.trigger(Trigger.ProcessingTime("1 second"))

See
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Thu, May 24, 2018 at 10:14 PM, Peter Liu  wrote:

> Hi there,
>
> from my apache spark streaming website (see links below),
>
>- the batch-interval is set when a spark StreamingContext is
>constructed (see example (a) quoted below)
>- the StreamingContext is available in older and new Spark version
>(v1.6, v2.2 to v2.3.0) (see https://spark.apache.org/docs/
>1.6.0/streaming-programming-guide.html
>
>and https://spark.apache.org/docs/2.3.0/streaming-programming-
>guide.html )
>- however, example (b) below  doesn't use StreamingContext, but
>StreamingSession object to setup a streaming flow;
>
> What does the usage difference in (a) and (b) mean? I was wondering if
> this would mean a different streaming approach ("traditional" streaming vs
> structured streaming?
>
> Basically I need to find a way to set the batch-interval in (b), similar
> as in (a) below.
>
> Would be great if someone can please share some insights here.
>
> Thanks!
>
> Peter
>
> (a)
> https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )
>
> import org.apache.spark._import org.apache.spark.streaming._
> val conf = new SparkConf().setAppName(appName).setMaster(master)val *ssc *= 
> new StreamingContext(conf, Seconds(1))
>
>
> (b)
> ( from databricks' https://databricks.com/blog/
> 2017/04/26/processing-data-in-apache-kafka-with-structured-
> streaming-in-apache-spark-2-2.html)
>
>val *spark *= SparkSession.builder()
> .appName(appName)
>   .getOrCreate()
> ...
>
> jsonOptions = { "timestampFormat": nestTimestampFormat }
> parsed = *spark *\
>   .readStream \
>   .format("kafka") \
>   .option("kafka.bootstrap.servers", "localhost:9092") \
>   .option("subscribe", "nest-logs") \
>   .load() \
>   .select(from_json(col("value").cast("string"), schema, 
> jsonOptions).alias("parsed_value"))
>
>
>
>
>


re: help with streaming batch interval question needed

2018-05-24 Thread Peter Liu
 Hi there,

from my apache spark streaming website (see links below),

   - the batch-interval is set when a spark StreamingContext is constructed
   (see example (a) quoted below)
   - the StreamingContext is available in older and new Spark version
   (v1.6, v2.2 to v2.3.0) (see
   https://spark.apache.org/docs/1.6.0/streaming-programming-guide.html
   and https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html
   )
   - however, example (b) below  doesn't use StreamingContext, but
   StreamingSession object to setup a streaming flow;

What does the usage difference in (a) and (b) mean? I was wondering if this
would mean a different streaming approach ("traditional" streaming vs
structured streaming?

Basically I need to find a way to set the batch-interval in (b), similar as
in (a) below.

Would be great if someone can please share some insights here.

Thanks!

Peter

(a)
https://spark.apache.org/docs/2.3.0/streaming-programming-guide.html )

import org.apache.spark._import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)val
*ssc *= new StreamingContext(conf, Seconds(1))


(b)
( from databricks'
https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html
)

   val *spark *= SparkSession.builder()
.appName(appName)
  .getOrCreate()
...

jsonOptions = { "timestampFormat": nestTimestampFormat }
parsed = *spark *\
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "nest-logs") \
  .load() \
  .select(from_json(col("value").cast("string"), schema,
jsonOptions).alias("parsed_value"))


Re: help in copying data from one azure subscription to another azure subscription

2018-05-23 Thread Pushkar.Gujar
What are you using for storing data in those subscriptions? Datalake or
Blobs? There is Azure Data Factory already available that can do copy
between these cloud storage without having to go through spark


Thank you,
*Pushkar Gujar*


On Mon, May 21, 2018 at 8:59 AM, amit kumar singh 
wrote:

> HI Team,
>
> We are trying to move data between one azure subscription to another azure
> subscription is there a faster way to do through spark
>
> i am using distcp and its taking for ever
>
> thanks
> rohit
>


Re: Help Required - Unable to run spark-submit on YARN client mode

2018-05-08 Thread Deepak Sharma
Can you try increasing the partition for the base RDD/dataframe that you
are working on?


On Tue, May 8, 2018 at 5:05 PM, Debabrata Ghosh 
wrote:

> Hi Everyone,
> I have been trying to run spark-shell in YARN client mode, but am getting
> lot of ClosedChannelException errors, however the program works fine on
> local mode.  I am using spark 2.2.0 build for Hadoop 2.7.3.  If you are
> familiar with this error, please can you help with the possible resolution.
>
> Any help would be greatly appreciated!
>
> Here is the error message:
>
> 18/05/08 00:01:18 ERROR TransportClient: Failed to send RPC
> 7905321254854295784 to /9.30.94.43:60220: java.nio.channels.
> ClosedChannelException
> java.nio.channels.ClosedChannelException
> at io.netty.channel.AbstractChannel$AbstractUnsafe.write(...)(Unknown
> Source)
> 18/05/08 00:01:18 ERROR YarnSchedulerBackend$YarnSchedulerEndpoint:
> Sending RequestExecutors(5,0,Map(),Set()) to AM was unsuccessful
> java.io.IOException: Failed to send RPC 7905321254854295784 to /
> 9.30.94.43:60220: java.nio.channels.ClosedChannelException
> at org.apache.spark.network.client.TransportClient.lambda$
> sendRpc$2(TransportClient.java:237)
> at io.netty.util.concurrent.DefaultPromise.notifyListener0(
> DefaultPromise.java:507)
> at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(
> DefaultPromise.java:481)
> at io.netty.util.concurrent.DefaultPromise.access$000(
> DefaultPromise.java:34)
> at io.netty.util.concurrent.DefaultPromise$1.run(
> DefaultPromise.java:431)
> at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(
> SingleThreadEventExecutor.java:399)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:446)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.
> run(SingleThreadEventExecutor.java:131)
> at io.netty.util.concurrent.DefaultThreadFactory$
> DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.nio.channels.ClosedChannelException
>
> Cheers,
>
> Debu
>



-- 
Thanks
Deepak
www.bigdatabig.com
www.keosha.net


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-22 Thread Naresh Dulam
Hi Sunitha,

Make the class which is having the common function your calling as
serializable.


Thank you,
Naresh

On Wed, Dec 20, 2017 at 9:58 PM Sunitha Chennareddy <
chennareddysuni...@gmail.com> wrote:

> Hi,
>
> Thank You All..
>
> Here is my requirement, I have a dataframe which contains list of rows
> retrieved from oracle table.
> I need to iterate dataframe and fetch each record and call a common
> function by passing few parameters.
>
> Issue I am facing is : I am not able to call common function
>
> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
> Function() {
>   @Override
>   public Person call(Row row)  throws Exception{
>   Person person = new Person();
>   person.setId(row.getDecimal(0).longValue());
>   person.setName(row.getString(1));
>
> personLst.add(person);
> return person;
>   }
> });
>
> personRDD.foreach(new VoidFunction() {
> private static final long serialVersionUID = 1123456L;
>
> @Override
> public void call(Person person) throws Exception
> {
>   System.out.println(person.getId());
> Here I tried to call common function 
> }
>});
>
> I am able to print data in foreach loop, however if I tried to call common
> function it gives me below error
> Error Message :  org.apache.spark.SparkException: Task not serializable
>
> I kindly request you to share some idea(sample code / link to refer) on
> how to call a common function/Interace method by passing values in each
> record of the dataframe.
>
> Regards,
> Sunitha
>
>
> On Tue, Dec 19, 2017 at 1:20 PM, Weichen Xu 
> wrote:
>
>> Hi Sunitha,
>>
>> In the mapper function, you cannot update outer variables such as 
>> `personLst.add(person)`,
>> this won't work so that's the reason you got an empty list.
>>
>> You can use `rdd.collect()` to get a local list of `Person` objects
>> first, then you can safely iterate on the local list and do any update you
>> want.
>>
>> Thanks.
>>
>> On Tue, Dec 19, 2017 at 2:16 PM, Sunitha Chennareddy <
>> chennareddysuni...@gmail.com> wrote:
>>
>>> Hi Deepak,
>>>
>>> I am able to map row to person class, issue is I want to to call another
>>> method.
>>> I tried converting to list and its not working with out using collect.
>>>
>>> Regards
>>> Sunitha
>>> On Tuesday, December 19, 2017, Deepak Sharma 
>>> wrote:
>>>
 I am not sure about java but in scala it would be something like
 df.rdd.map{ x => MyClass(x.getString(0),.)}

 HTH

 --Deepak

 On Dec 19, 2017 09:25, "Sunitha Chennareddy" > wrote:

 Hi All,

 I am new to Spark, I want to convert DataFrame to List with
 out using collect().

 Main requirement is I need to iterate through the rows of dataframe and
 call another function by passing column value of each row (person.getId())

 Here is the snippet I have tried, Kindly help me to resolve the issue,
 personLst is returning 0:

 List personLst= new ArrayList();
 JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
 Function() {
   public Person call(Row row)  throws Exception{
   Person person = new Person();
   person.setId(row.getDecimal(0).longValue());
   person.setName(row.getString(1));

 personLst.add(person);
 // here I tried to call another function but control never passed
 return person;
   }
 });
 logger.info("personLst size =="+personLst.size());
 logger.info("personRDD count ==="+personRDD.count());

 //output is
 personLst size == 0
 personRDD count === 3



>>
>


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-20 Thread Sunitha Chennareddy
Hi,

Thank You All..

Here is my requirement, I have a dataframe which contains list of rows
retrieved from oracle table.
I need to iterate dataframe and fetch each record and call a common
function by passing few parameters.

Issue I am facing is : I am not able to call common function

JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
Function() {
  @Override
  public Person call(Row row)  throws Exception{
  Person person = new Person();
  person.setId(row.getDecimal(0).longValue());
  person.setName(row.getString(1));

personLst.add(person);
return person;
  }
});

personRDD.foreach(new VoidFunction() {
private static final long serialVersionUID = 1123456L;

@Override
public void call(Person person) throws Exception
{
  System.out.println(person.getId());
Here I tried to call common function 
}
   });

I am able to print data in foreach loop, however if I tried to call common
function it gives me below error
Error Message :  org.apache.spark.SparkException: Task not serializable

I kindly request you to share some idea(sample code / link to refer) on how
to call a common function/Interace method by passing values in each record
of the dataframe.

Regards,
Sunitha


On Tue, Dec 19, 2017 at 1:20 PM, Weichen Xu 
wrote:

> Hi Sunitha,
>
> In the mapper function, you cannot update outer variables such as 
> `personLst.add(person)`,
> this won't work so that's the reason you got an empty list.
>
> You can use `rdd.collect()` to get a local list of `Person` objects
> first, then you can safely iterate on the local list and do any update you
> want.
>
> Thanks.
>
> On Tue, Dec 19, 2017 at 2:16 PM, Sunitha Chennareddy <
> chennareddysuni...@gmail.com> wrote:
>
>> Hi Deepak,
>>
>> I am able to map row to person class, issue is I want to to call another
>> method.
>> I tried converting to list and its not working with out using collect.
>>
>> Regards
>> Sunitha
>> On Tuesday, December 19, 2017, Deepak Sharma 
>> wrote:
>>
>>> I am not sure about java but in scala it would be something like
>>> df.rdd.map{ x => MyClass(x.getString(0),.)}
>>>
>>> HTH
>>>
>>> --Deepak
>>>
>>> On Dec 19, 2017 09:25, "Sunitha Chennareddy" >> > wrote:
>>>
>>> Hi All,
>>>
>>> I am new to Spark, I want to convert DataFrame to List with
>>> out using collect().
>>>
>>> Main requirement is I need to iterate through the rows of dataframe and
>>> call another function by passing column value of each row (person.getId())
>>>
>>> Here is the snippet I have tried, Kindly help me to resolve the issue,
>>> personLst is returning 0:
>>>
>>> List personLst= new ArrayList();
>>> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
>>> Function() {
>>>   public Person call(Row row)  throws Exception{
>>>   Person person = new Person();
>>>   person.setId(row.getDecimal(0).longValue());
>>>   person.setName(row.getString(1));
>>>
>>> personLst.add(person);
>>> // here I tried to call another function but control never passed
>>> return person;
>>>   }
>>> });
>>> logger.info("personLst size =="+personLst.size());
>>> logger.info("personRDD count ==="+personRDD.count());
>>>
>>> //output is
>>> personLst size == 0
>>> personRDD count === 3
>>>
>>>
>>>
>


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-18 Thread Weichen Xu
Hi Sunitha,

In the mapper function, you cannot update outer variables such as
`personLst.add(person)`,
this won't work so that's the reason you got an empty list.

You can use `rdd.collect()` to get a local list of `Person` objects first,
then you can safely iterate on the local list and do any update you want.

Thanks.

On Tue, Dec 19, 2017 at 2:16 PM, Sunitha Chennareddy <
chennareddysuni...@gmail.com> wrote:

> Hi Deepak,
>
> I am able to map row to person class, issue is I want to to call another
> method.
> I tried converting to list and its not working with out using collect.
>
> Regards
> Sunitha
> On Tuesday, December 19, 2017, Deepak Sharma 
> wrote:
>
>> I am not sure about java but in scala it would be something like
>> df.rdd.map{ x => MyClass(x.getString(0),.)}
>>
>> HTH
>>
>> --Deepak
>>
>> On Dec 19, 2017 09:25, "Sunitha Chennareddy" > > wrote:
>>
>> Hi All,
>>
>> I am new to Spark, I want to convert DataFrame to List with
>> out using collect().
>>
>> Main requirement is I need to iterate through the rows of dataframe and
>> call another function by passing column value of each row (person.getId())
>>
>> Here is the snippet I have tried, Kindly help me to resolve the issue,
>> personLst is returning 0:
>>
>> List personLst= new ArrayList();
>> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
>> Function() {
>>   public Person call(Row row)  throws Exception{
>>   Person person = new Person();
>>   person.setId(row.getDecimal(0).longValue());
>>   person.setName(row.getString(1));
>>
>> personLst.add(person);
>> // here I tried to call another function but control never passed
>> return person;
>>   }
>> });
>> logger.info("personLst size =="+personLst.size());
>> logger.info("personRDD count ==="+personRDD.count());
>>
>> //output is
>> personLst size == 0
>> personRDD count === 3
>>
>>
>>


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-18 Thread Sunitha Chennareddy
Hi Jorn,

In my case I have to call common interface function
by passing the values of each rdd. So I have tried iterating , but I was
not able to trigger common function from call method as commented in the
snippet code in my earlier mail.

Request you please share your views.

Regards
Sunitha

On Tuesday, December 19, 2017, Jörn Franke  wrote:

> This is correct behavior. If you need to call another method simply append
> another map, flatmap or whatever you need.
>
> Depending on your use case you may use also reduce and reduce by key.
> However you never (!) should use a global variable as in your snippet.
> This can to work because you work in a distributed setting.
> Probably the code will fail on a cluster or at random.
>
> On 19. Dec 2017, at 07:16, Sunitha Chennareddy <
> chennareddysuni...@gmail.com> wrote:
>
> Hi Deepak,
>
> I am able to map row to person class, issue is I want to to call another
> method.
> I tried converting to list and its not working with out using collect.
>
> Regards
> Sunitha
> On Tuesday, December 19, 2017, Deepak Sharma 
> wrote:
>
>> I am not sure about java but in scala it would be something like
>> df.rdd.map{ x => MyClass(x.getString(0),.)}
>>
>> HTH
>>
>> --Deepak
>>
>> On Dec 19, 2017 09:25, "Sunitha Chennareddy" > > wrote:
>>
>> Hi All,
>>
>> I am new to Spark, I want to convert DataFrame to List with
>> out using collect().
>>
>> Main requirement is I need to iterate through the rows of dataframe and
>> call another function by passing column value of each row (person.getId())
>>
>> Here is the snippet I have tried, Kindly help me to resolve the issue,
>> personLst is returning 0:
>>
>> List personLst= new ArrayList();
>> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
>> Function() {
>>   public Person call(Row row)  throws Exception{
>>   Person person = new Person();
>>   person.setId(row.getDecimal(0).longValue());
>>   person.setName(row.getString(1));
>>
>> personLst.add(person);
>> // here I tried to call another function but control never passed
>> return person;
>>   }
>> });
>> logger.info("personLst size =="+personLst.size());
>> logger.info("personRDD count ==="+personRDD.count());
>>
>> //output is
>> personLst size == 0
>> personRDD count === 3
>>
>>
>>


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-18 Thread Jörn Franke
This is correct behavior. If you need to call another method simply append 
another map, flatmap or whatever you need.

Depending on your use case you may use also reduce and reduce by key.
However you never (!) should use a global variable as in your snippet. This can 
to work because you work in a distributed setting.
Probably the code will fail on a cluster or at random.

> On 19. Dec 2017, at 07:16, Sunitha Chennareddy  
> wrote:
> 
> Hi Deepak,
> 
> I am able to map row to person class, issue is I want to to call another 
> method.
> I tried converting to list and its not working with out using collect.
>  
> Regards
> Sunitha
>> On Tuesday, December 19, 2017, Deepak Sharma  wrote:
>> I am not sure about java but in scala it would be something like df.rdd.map{ 
>> x => MyClass(x.getString(0),.)}
>> 
>> HTH
>> 
>> --Deepak
>> 
>> On Dec 19, 2017 09:25, "Sunitha Chennareddy"  wrote:
>> Hi All,
>> 
>> I am new to Spark, I want to convert DataFrame to List with out 
>> using collect().
>> 
>> Main requirement is I need to iterate through the rows of dataframe and call 
>> another function by passing column value of each row (person.getId())
>> 
>> Here is the snippet I have tried, Kindly help me to resolve the issue, 
>> personLst is returning 0:
>> 
>> List personLst= new ArrayList(); 
>> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new 
>> Function() {
>>  
>>public Person call(Row row)  throws Exception{
>>Person person = new Person();
>>
>> person.setId(row.getDecimal(0).longValue());
>>person.setName(row.getString(1)); 
>>
>>  personLst.add(person);
>>  // here I tried to call another 
>> function but control never passed
>>  return person;
>>}
>>  });
>>  
>> logger.info("personLst size =="+personLst.size());
>> logger.info("personRDD count ==="+personRDD.count());
>> 
>> //output is 
>> personLst size == 0
>> personRDD count === 3
>>  
>> 


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-18 Thread Sunitha Chennareddy
Hi Deepak,

I am able to map row to person class, issue is I want to to call another
method.
I tried converting to list and its not working with out using collect.

Regards
Sunitha
On Tuesday, December 19, 2017, Deepak Sharma  wrote:

> I am not sure about java but in scala it would be something like
> df.rdd.map{ x => MyClass(x.getString(0),.)}
>
> HTH
>
> --Deepak
>
> On Dec 19, 2017 09:25, "Sunitha Chennareddy"  > wrote:
>
> Hi All,
>
> I am new to Spark, I want to convert DataFrame to List with out
> using collect().
>
> Main requirement is I need to iterate through the rows of dataframe and
> call another function by passing column value of each row (person.getId())
>
> Here is the snippet I have tried, Kindly help me to resolve the issue,
> personLst is returning 0:
>
> List personLst= new ArrayList();
> JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
> Function() {
>   public Person call(Row row)  throws Exception{
>   Person person = new Person();
>   person.setId(row.getDecimal(0).longValue());
>   person.setName(row.getString(1));
>
> personLst.add(person);
> // here I tried to call another function but control never passed
> return person;
>   }
> });
> logger.info("personLst size =="+personLst.size());
> logger.info("personRDD count ==="+personRDD.count());
>
> //output is
> personLst size == 0
> personRDD count === 3
>
>
>


Re: Help Required on Spark - Convert DataFrame to List with out using collect

2017-12-18 Thread Deepak Sharma
I am not sure about java but in scala it would be something like
df.rdd.map{ x => MyClass(x.getString(0),.)}

HTH

--Deepak

On Dec 19, 2017 09:25, "Sunitha Chennareddy" 
wrote:

Hi All,

I am new to Spark, I want to convert DataFrame to List with out
using collect().

Main requirement is I need to iterate through the rows of dataframe and
call another function by passing column value of each row (person.getId())

Here is the snippet I have tried, Kindly help me to resolve the issue,
personLst is returning 0:

List personLst= new ArrayList();
JavaRDD personRDD = person_dataframe.toJavaRDD().map(new
Function() {
  public Person call(Row row)  throws Exception{
  Person person = new Person();
  person.setId(row.getDecimal(0).longValue());
  person.setName(row.getString(1));

personLst.add(person);
// here I tried to call another function but control never passed
return person;
  }
});
logger.info("personLst size =="+personLst.size());
logger.info("personRDD count ==="+personRDD.count());

//output is
personLst size == 0
personRDD count === 3


Re: Help taking last value in each group (aggregates)

2017-08-28 Thread Everett Anderson
I'm still unclear on if orderBy/groupBy + aggregates is a viable approach
or when one could rely on the last or first aggregate functions, but a
working alternative is to use window functions with row_number and a filter
kind of like this:

import spark.implicits._

val reverseOrdering = Seq("a", "b").map(col => df(col).desc)

val windowSpec = Window.partitionBy("group_id").orderBy(reverseOrdering:_*)

df.select("group_id",
  "row_id",
  sum("col_to_sum").over(windowSpec).as("total"),
  row_number().over(windowSpec).as("row_number"))
  .filter("row_number == 1")
  .select($"group_id",
  $"row_id".as("last_row_id"),
  $"total")

Would love to know if there's a better way!

On Mon, Aug 28, 2017 at 9:19 AM, Everett Anderson  wrote:

> Hi,
>
> I'm struggling a little with some unintuitive behavior with the Scala API.
> (Spark 2.0.2)
>
> I wrote something like
>
> df.orderBy("a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>last("row_id").as("last_row_id")))
>
> and expected a result with a unique group_id column, a column called
> "total" that's the sum of all col_to_sum in each group, and a column called
> "last_row_id" that's the last row_id seen in each group when the groups are
> sorted by columns a and b.
>
> However, the result is actually non-deterministic and changes based on the
> initial sorting and partitioning of df.
>
> I also tried
>
> df.orderBy("group_id", "a", "b")
>   .groupBy("group_id")
>   .agg(sum("col_to_sum").as("total"),
>last("row_id").as("last_row_id")))
>
> thinking the problem might be that the groupBy does another shuffle that
> loses the ordering, but that also doesn't seem to work.
>
> Looking through the code
> ,
> both the Last and First aggregate functions have this comment:
>
> Even if [[Last]] is used on an already sorted column, if
> we do partial aggregation and final aggregation
> (when mergeExpression
> is used) its result will not be deterministic
> (unless the input table is sorted and has
> a single partition, and we use a single reducer to do the aggregation.).
>
>
> Some questions:
>
>1. What's the best way to take some values from the last row in an
>ordered group while performing some other aggregates over the entire group?
>
>2. Given these comments on last and first, when would these functions
>be useful? It would be rare to bring an entire Spark table to a single
>partition.
>
> Thanks!
>
>


Re: Help in Parsing 'Categorical' type of data

2017-06-23 Thread Yanbo Liang
Please consider to use other classification models such as logistic
regression or GBT. Naive bayes usually consider features as count, which is
not suitable to be used on features generated by one-hot encoder.

Thanks
Yanbo

On Wed, May 31, 2017 at 3:58 PM, Amlan Jyoti  wrote:

> Hi,
>
> I am trying to run Naive Bayes Model using Spark ML libraries, in Java.
> The sample snippet of dataset is given below:
>
> *Raw Data* -
>
>
> But, as the input data needs to in numeric, so I am using
> *one-hot-encoder* on the Gender field[m->0,1][f->1,0]; and the finally
> the 'features' vector is inputted to Model, and I could get the Output.
>
> *Transformed Data* -
>
>
> But the model *results are not correct *as the 'Gender' field[Originally,
> Categorical] is now considered as a continuous field after one-hot encoding
> transformations.
>
> *Expectation* is that - for 'continuous data', mean and variance ; and
> for 'categorical data', the number of occurrences of different categories,
> is to be calculated. [In, my case, mean and variances are calculated even
> for the Gender Field].
>
> So, is there any way by which I can indicate to the model that a
> particular data field is 'categorical' by nature?
>
> Thanks
>
> Best Regards
> Amlan Jyoti
>
>
> =-=-=
> Notice: The information contained in this e-mail
> message and/or attachments to it may contain
> confidential or privileged information. If you are
> not the intended recipient, any dissemination, use,
> review, distribution, printing or copying of the
> information contained in this e-mail message
> and/or attachments to it are strictly prohibited. If
> you have received this communication in error,
> please notify us by reply e-mail or telephone and
> immediately and permanently delete the message
> and any attachments. Thank you
>
>


Re: help/suggestions to setup spark cluster

2017-04-27 Thread Cody Koeninger
You can just cap the cores used per job.

http://spark.apache.org/docs/latest/spark-standalone.html

http://spark.apache.org/docs/latest/spark-standalone.html#resource-scheduling

On Thu, Apr 27, 2017 at 1:07 AM, vincent gromakowski
 wrote:
> Spark standalone is not multi tenant you need one clusters per job. Maybe
> you can try fair scheduling and use one cluster but i doubt it will be prod
> ready...
>
> Le 27 avr. 2017 5:28 AM, "anna stax"  a écrit :
>>
>> Thanks Cody,
>>
>> As I already mentioned I am running spark streaming on EC2 cluster in
>> standalone mode. Now in addition to streaming, I want to be able to run
>> spark batch job hourly and adhoc queries using Zeppelin.
>>
>> Can you please confirm that a standalone cluster is OK for this. Please
>> provide me some links to help me get started.
>>
>> Thanks
>> -Anna
>>
>> On Wed, Apr 26, 2017 at 7:46 PM, Cody Koeninger 
>> wrote:
>>>
>>> The standalone cluster manager is fine for production.  Don't use Yarn
>>> or Mesos unless you already have another need for it.
>>>
>>> On Wed, Apr 26, 2017 at 4:53 PM, anna stax  wrote:
>>> > Hi Sam,
>>> >
>>> > Thank you for the reply.
>>> >
>>> > What do you mean by
>>> > I doubt people run spark in a. Single EC2 instance, certainly not in
>>> > production I don't think
>>> >
>>> > What is wrong in having a data pipeline on EC2 that reads data from
>>> > kafka,
>>> > processes using spark and outputs to cassandra? Please explain.
>>> >
>>> > Thanks
>>> > -Anna
>>> >
>>> > On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin 
>>> > wrote:
>>> >>
>>> >> Hi Anna
>>> >>
>>> >> There are a variety of options for launching spark clusters. I doubt
>>> >> people run spark in a. Single EC2 instance, certainly not in
>>> >> production I
>>> >> don't think
>>> >>
>>> >> I don't have enough information of what you are trying to do but if
>>> >> you
>>> >> are just trying to set things up from scratch then I think you can
>>> >> just use
>>> >> EMR which will create a cluster for you and attach a zeppelin instance
>>> >> as
>>> >> well
>>> >>
>>> >>
>>> >> You can also use databricks for ease of use and very little management
>>> >> but
>>> >> you will pay a premium for that abstraction
>>> >>
>>> >>
>>> >> Regards
>>> >> Sam
>>> >> On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:
>>> >>>
>>> >>> I need to setup a spark cluster for Spark streaming and scheduled
>>> >>> batch
>>> >>> jobs and adhoc queries.
>>> >>> Please give me some suggestions. Can this be done in standalone mode.
>>> >>>
>>> >>> Right now we have a spark cluster in standalone mode on AWS EC2
>>> >>> running
>>> >>> spark streaming application. Can we run spark batch jobs and zeppelin
>>> >>> on the
>>> >>> same. Do we need a better resource manager like Mesos?
>>> >>>
>>> >>> Are there any companies or individuals that can help in setting this
>>> >>> up?
>>> >>>
>>> >>> Thank you.
>>> >>>
>>> >>> -Anna
>>> >
>>> >
>>
>>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: help/suggestions to setup spark cluster

2017-04-27 Thread vincent gromakowski
Spark standalone is not multi tenant you need one clusters per job. Maybe
you can try fair scheduling and use one cluster but i doubt it will be prod
ready...

Le 27 avr. 2017 5:28 AM, "anna stax"  a écrit :

> Thanks Cody,
>
> As I already mentioned I am running spark streaming on EC2 cluster in
> standalone mode. Now in addition to streaming, I want to be able to run
> spark batch job hourly and adhoc queries using Zeppelin.
>
> Can you please confirm that a standalone cluster is OK for this. Please
> provide me some links to help me get started.
>
> Thanks
> -Anna
>
> On Wed, Apr 26, 2017 at 7:46 PM, Cody Koeninger 
> wrote:
>
>> The standalone cluster manager is fine for production.  Don't use Yarn
>> or Mesos unless you already have another need for it.
>>
>> On Wed, Apr 26, 2017 at 4:53 PM, anna stax  wrote:
>> > Hi Sam,
>> >
>> > Thank you for the reply.
>> >
>> > What do you mean by
>> > I doubt people run spark in a. Single EC2 instance, certainly not in
>> > production I don't think
>> >
>> > What is wrong in having a data pipeline on EC2 that reads data from
>> kafka,
>> > processes using spark and outputs to cassandra? Please explain.
>> >
>> > Thanks
>> > -Anna
>> >
>> > On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin 
>> wrote:
>> >>
>> >> Hi Anna
>> >>
>> >> There are a variety of options for launching spark clusters. I doubt
>> >> people run spark in a. Single EC2 instance, certainly not in
>> production I
>> >> don't think
>> >>
>> >> I don't have enough information of what you are trying to do but if you
>> >> are just trying to set things up from scratch then I think you can
>> just use
>> >> EMR which will create a cluster for you and attach a zeppelin instance
>> as
>> >> well
>> >>
>> >>
>> >> You can also use databricks for ease of use and very little management
>> but
>> >> you will pay a premium for that abstraction
>> >>
>> >>
>> >> Regards
>> >> Sam
>> >> On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:
>> >>>
>> >>> I need to setup a spark cluster for Spark streaming and scheduled
>> batch
>> >>> jobs and adhoc queries.
>> >>> Please give me some suggestions. Can this be done in standalone mode.
>> >>>
>> >>> Right now we have a spark cluster in standalone mode on AWS EC2
>> running
>> >>> spark streaming application. Can we run spark batch jobs and zeppelin
>> on the
>> >>> same. Do we need a better resource manager like Mesos?
>> >>>
>> >>> Are there any companies or individuals that can help in setting this
>> up?
>> >>>
>> >>> Thank you.
>> >>>
>> >>> -Anna
>> >
>> >
>>
>
>


Re: help/suggestions to setup spark cluster

2017-04-26 Thread anna stax
Thanks Cody,

As I already mentioned I am running spark streaming on EC2 cluster in
standalone mode. Now in addition to streaming, I want to be able to run
spark batch job hourly and adhoc queries using Zeppelin.

Can you please confirm that a standalone cluster is OK for this. Please
provide me some links to help me get started.

Thanks
-Anna

On Wed, Apr 26, 2017 at 7:46 PM, Cody Koeninger  wrote:

> The standalone cluster manager is fine for production.  Don't use Yarn
> or Mesos unless you already have another need for it.
>
> On Wed, Apr 26, 2017 at 4:53 PM, anna stax  wrote:
> > Hi Sam,
> >
> > Thank you for the reply.
> >
> > What do you mean by
> > I doubt people run spark in a. Single EC2 instance, certainly not in
> > production I don't think
> >
> > What is wrong in having a data pipeline on EC2 that reads data from
> kafka,
> > processes using spark and outputs to cassandra? Please explain.
> >
> > Thanks
> > -Anna
> >
> > On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin 
> wrote:
> >>
> >> Hi Anna
> >>
> >> There are a variety of options for launching spark clusters. I doubt
> >> people run spark in a. Single EC2 instance, certainly not in production
> I
> >> don't think
> >>
> >> I don't have enough information of what you are trying to do but if you
> >> are just trying to set things up from scratch then I think you can just
> use
> >> EMR which will create a cluster for you and attach a zeppelin instance
> as
> >> well
> >>
> >>
> >> You can also use databricks for ease of use and very little management
> but
> >> you will pay a premium for that abstraction
> >>
> >>
> >> Regards
> >> Sam
> >> On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:
> >>>
> >>> I need to setup a spark cluster for Spark streaming and scheduled batch
> >>> jobs and adhoc queries.
> >>> Please give me some suggestions. Can this be done in standalone mode.
> >>>
> >>> Right now we have a spark cluster in standalone mode on AWS EC2 running
> >>> spark streaming application. Can we run spark batch jobs and zeppelin
> on the
> >>> same. Do we need a better resource manager like Mesos?
> >>>
> >>> Are there any companies or individuals that can help in setting this
> up?
> >>>
> >>> Thank you.
> >>>
> >>> -Anna
> >
> >
>


Re: help/suggestions to setup spark cluster

2017-04-26 Thread Cody Koeninger
The standalone cluster manager is fine for production.  Don't use Yarn
or Mesos unless you already have another need for it.

On Wed, Apr 26, 2017 at 4:53 PM, anna stax  wrote:
> Hi Sam,
>
> Thank you for the reply.
>
> What do you mean by
> I doubt people run spark in a. Single EC2 instance, certainly not in
> production I don't think
>
> What is wrong in having a data pipeline on EC2 that reads data from kafka,
> processes using spark and outputs to cassandra? Please explain.
>
> Thanks
> -Anna
>
> On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin  wrote:
>>
>> Hi Anna
>>
>> There are a variety of options for launching spark clusters. I doubt
>> people run spark in a. Single EC2 instance, certainly not in production I
>> don't think
>>
>> I don't have enough information of what you are trying to do but if you
>> are just trying to set things up from scratch then I think you can just use
>> EMR which will create a cluster for you and attach a zeppelin instance as
>> well
>>
>>
>> You can also use databricks for ease of use and very little management but
>> you will pay a premium for that abstraction
>>
>>
>> Regards
>> Sam
>> On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:
>>>
>>> I need to setup a spark cluster for Spark streaming and scheduled batch
>>> jobs and adhoc queries.
>>> Please give me some suggestions. Can this be done in standalone mode.
>>>
>>> Right now we have a spark cluster in standalone mode on AWS EC2 running
>>> spark streaming application. Can we run spark batch jobs and zeppelin on the
>>> same. Do we need a better resource manager like Mesos?
>>>
>>> Are there any companies or individuals that can help in setting this up?
>>>
>>> Thank you.
>>>
>>> -Anna
>
>

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: help/suggestions to setup spark cluster

2017-04-26 Thread anna stax
Hi Sam,

Thank you for the reply.

What do you mean by
I doubt people run spark in a. Single EC2 instance, certainly not in
production I don't think

What is wrong in having a data pipeline on EC2 that reads data from kafka,
processes using spark and outputs to cassandra? Please explain.

Thanks
-Anna

On Wed, Apr 26, 2017 at 2:22 PM, Sam Elamin  wrote:

> Hi Anna
>
> There are a variety of options for launching spark clusters. I doubt
> people run spark in a. Single EC2 instance, certainly not in production I
> don't think
>
> I don't have enough information of what you are trying to do but if you
> are just trying to set things up from scratch then I think you can just use
> EMR which will create a cluster for you and attach a zeppelin instance as
> well
>
>
> You can also use databricks for ease of use and very little management but
> you will pay a premium for that abstraction
>
>
> Regards
> Sam
> On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:
>
>> I need to setup a spark cluster for Spark streaming and scheduled batch
>> jobs and adhoc queries.
>> Please give me some suggestions. Can this be done in standalone mode.
>>
>> Right now we have a spark cluster in standalone mode on AWS EC2 running
>> spark streaming application. Can we run spark batch jobs and zeppelin on
>> the same. Do we need a better resource manager like Mesos?
>>
>> Are there any companies or individuals that can help in setting this up?
>>
>> Thank you.
>>
>> -Anna
>>
>


Re: help/suggestions to setup spark cluster

2017-04-26 Thread Sam Elamin
Hi Anna

There are a variety of options for launching spark clusters. I doubt people
run spark in a. Single EC2 instance, certainly not in production I don't
think

I don't have enough information of what you are trying to do but if you are
just trying to set things up from scratch then I think you can just use EMR
which will create a cluster for you and attach a zeppelin instance as well


You can also use databricks for ease of use and very little management but
you will pay a premium for that abstraction


Regards
Sam
On Wed, 26 Apr 2017 at 22:02, anna stax  wrote:

> I need to setup a spark cluster for Spark streaming and scheduled batch
> jobs and adhoc queries.
> Please give me some suggestions. Can this be done in standalone mode.
>
> Right now we have a spark cluster in standalone mode on AWS EC2 running
> spark streaming application. Can we run spark batch jobs and zeppelin on
> the same. Do we need a better resource manager like Mesos?
>
> Are there any companies or individuals that can help in setting this up?
>
> Thank you.
>
> -Anna
>


Re: help!!!----issue with spark-sql type cast form long to longwritable

2017-01-30 Thread Alex
Hi All,

If I modify the code to below The hive UDF is working in spark-sql but it
is giving different results..Please let me know difference between these
two below codes..

1) public Object get(Object name) {
  int pos = getPos((String)name);
  if(pos<0) return null;
  String f = "string";
  Object obj= list.get(pos);
  Object result = null;
  if(obj==null) return null;
  ObjectInspector ins =
((StructField)colnames.get(pos)).getFieldObjectInspector();
  if(ins!=null) f = ins.getTypeName();
  PrimitiveObjectInspector ins2 = (PrimitiveObjectInspector)ins;
  switch (ins2.getPrimitiveCategory()) {
  case DOUBLE :{
  result = new Double(((DoubleObjectInspector)ins2).get(obj));
  break;
  }


  case LONG:
  result = new Long(((LongObjectInspector)ins2).get(obj));
  break;
  case STRING:
  result =
((StringObjectInspector)ins2).getPrimitiveJavaObject(obj);
  break;
default  :  result = obj;

  }
  return result;
}






2) public Object get(Object name) {
  int pos = getPos((String)name);
 if(pos<0) return null;
 String f = "string";
  Object obj= list.get(pos);
 if(obj==null) return null;
 ObjectInspector ins =
((StructField)colnames.get(pos)).getFieldObjectInspector();
 if(ins!=null) f = ins.getTypeName();
 switch (f) {
   case "double" :  return ((DoubleWritable)obj).get();
case "bigint" :  return ((LongWritable)obj).get();
case "string" :  return ((Text)obj).toString();
   default  :  return obj;
 }
}

On Tue, Jan 24, 2017 at 5:29 PM, Sirisha Cheruvu  wrote:

> Hi Team,
>
> I am trying to keep below code in get method and calling that get mthod in
> another hive UDF
> and running the hive UDF using Hive Context.sql procedure..
>
>
> switch (f) {
> case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((LongWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
> default  :  return obj;
>   }
> }
>
> Suprisingly only LongWritable and Text convrsions are throwing error but
> DoubleWritable is working
> So I tried changing below code to
>
> switch (f) {
> case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((DoubleWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
> default  :  return obj;
>   }
> }
>
> Still its throws error saying Java.Lang.Long cant be convrted
> to org.apache.hadoop.hive.serde2.io.DoubleWritable
>
>
>
> its working fine on hive but throwing error on spark-sql
>
> I am importing the below packages.
> import java.util.*;
> import org.apache.hadoop.hive.serde2.objectinspector.*;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.hive.serde2.io.DoubleWritable;
>
> .Please let me know why it is making issue in spark when perfectly running
> fine on hive
>


Re: help!!!----issue with spark-sql type cast form long to longwritable

2017-01-30 Thread Alex
How to debug Hive UDfs?!

On Jan 24, 2017 5:29 PM, "Sirisha Cheruvu"  wrote:

> Hi Team,
>
> I am trying to keep below code in get method and calling that get mthod in
> another hive UDF
> and running the hive UDF using Hive Context.sql procedure..
>
>
> switch (f) {
> case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((LongWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
> default  :  return obj;
>   }
> }
>
> Suprisingly only LongWritable and Text convrsions are throwing error but
> DoubleWritable is working
> So I tried changing below code to
>
> switch (f) {
> case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((DoubleWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
> default  :  return obj;
>   }
> }
>
> Still its throws error saying Java.Lang.Long cant be convrted
> to org.apache.hadoop.hive.serde2.io.DoubleWritable
>
>
>
> its working fine on hive but throwing error on spark-sql
>
> I am importing the below packages.
> import java.util.*;
> import org.apache.hadoop.hive.serde2.objectinspector.*;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.hive.serde2.io.DoubleWritable;
>
> .Please let me know why it is making issue in spark when perfectly running
> fine on hive
>


Re: help!!!----issue with spark-sql type cast form long to longwritable

2017-01-30 Thread Alex
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error:
java.lang.Double cannot be cast to
org.apache.hadoop.hive.serde2.io.DoubleWritable]

Getting below error while running hive UDF on spark but the UDF is working
perfectly fine in Hive..


public Object get(Object name) {
  int pos = getPos((String)name);
 if(pos<0) return null;
 String f = "string";
  Object obj= list.get(pos);
 if(obj==null) return null;
 ObjectInspector ins =
((StructField)colnames.get(pos)).getFieldObjectInspector();
 if(ins!=null) f = ins.getTypeName();
 switch (f) {
   case "double" :  return ((DoubleWritable)obj).get();
case "bigint" :  return ((LongWritable)obj).get();
case "string" :  return ((Text)obj).toString();
   default  :  return obj;
 }
}



On Tue, Jan 24, 2017 at 9:19 PM, Takeshi Yamamuro 
wrote:

> Hi,
>
> Could you show us the whole code to reproduce that?
>
> // maropu
>
> On Wed, Jan 25, 2017 at 12:02 AM, Deepak Sharma 
> wrote:
>
>> Can you try writing the UDF directly in spark and register it with spark
>> sql or hive context ?
>> Or do you want to reuse the existing UDF jar for hive in spark ?
>>
>> Thanks
>> Deepak
>>
>> On Jan 24, 2017 5:29 PM, "Sirisha Cheruvu"  wrote:
>>
>>> Hi Team,
>>>
>>> I am trying to keep below code in get method and calling that get mthod
>>> in another hive UDF
>>> and running the hive UDF using Hive Context.sql procedure..
>>>
>>>
>>> switch (f) {
>>> case "double" :  return ((DoubleWritable)obj).get();
>>> case "bigint" :  return ((LongWritable)obj).get();
>>> case "string" :  return ((Text)obj).toString();
>>> default  :  return obj;
>>>   }
>>> }
>>>
>>> Suprisingly only LongWritable and Text convrsions are throwing error but
>>> DoubleWritable is working
>>> So I tried changing below code to
>>>
>>> switch (f) {
>>> case "double" :  return ((DoubleWritable)obj).get();
>>> case "bigint" :  return ((DoubleWritable)obj).get();
>>> case "string" :  return ((Text)obj).toString();
>>> default  :  return obj;
>>>   }
>>> }
>>>
>>> Still its throws error saying Java.Lang.Long cant be convrted
>>> to org.apache.hadoop.hive.serde2.io.DoubleWritable
>>>
>>>
>>>
>>> its working fine on hive but throwing error on spark-sql
>>>
>>> I am importing the below packages.
>>> import java.util.*;
>>> import org.apache.hadoop.hive.serde2.objectinspector.*;
>>> import org.apache.hadoop.io.LongWritable;
>>> import org.apache.hadoop.io.Text;
>>> import org.apache.hadoop.hive.serde2.io.DoubleWritable;
>>>
>>> .Please let me know why it is making issue in spark when perfectly
>>> running fine on hive
>>>
>>
>
>
> --
> ---
> Takeshi Yamamuro
>


Re: help!!!----issue with spark-sql type cast form long to longwritable

2017-01-24 Thread Takeshi Yamamuro
Hi,

Could you show us the whole code to reproduce that?

// maropu

On Wed, Jan 25, 2017 at 12:02 AM, Deepak Sharma 
wrote:

> Can you try writing the UDF directly in spark and register it with spark
> sql or hive context ?
> Or do you want to reuse the existing UDF jar for hive in spark ?
>
> Thanks
> Deepak
>
> On Jan 24, 2017 5:29 PM, "Sirisha Cheruvu"  wrote:
>
>> Hi Team,
>>
>> I am trying to keep below code in get method and calling that get mthod
>> in another hive UDF
>> and running the hive UDF using Hive Context.sql procedure..
>>
>>
>> switch (f) {
>> case "double" :  return ((DoubleWritable)obj).get();
>> case "bigint" :  return ((LongWritable)obj).get();
>> case "string" :  return ((Text)obj).toString();
>> default  :  return obj;
>>   }
>> }
>>
>> Suprisingly only LongWritable and Text convrsions are throwing error but
>> DoubleWritable is working
>> So I tried changing below code to
>>
>> switch (f) {
>> case "double" :  return ((DoubleWritable)obj).get();
>> case "bigint" :  return ((DoubleWritable)obj).get();
>> case "string" :  return ((Text)obj).toString();
>> default  :  return obj;
>>   }
>> }
>>
>> Still its throws error saying Java.Lang.Long cant be convrted
>> to org.apache.hadoop.hive.serde2.io.DoubleWritable
>>
>>
>>
>> its working fine on hive but throwing error on spark-sql
>>
>> I am importing the below packages.
>> import java.util.*;
>> import org.apache.hadoop.hive.serde2.objectinspector.*;
>> import org.apache.hadoop.io.LongWritable;
>> import org.apache.hadoop.io.Text;
>> import org.apache.hadoop.hive.serde2.io.DoubleWritable;
>>
>> .Please let me know why it is making issue in spark when perfectly
>> running fine on hive
>>
>


-- 
---
Takeshi Yamamuro


Re: help!!!----issue with spark-sql type cast form long to longwritable

2017-01-24 Thread Deepak Sharma
Can you try writing the UDF directly in spark and register it with spark
sql or hive context ?
Or do you want to reuse the existing UDF jar for hive in spark ?

Thanks
Deepak

On Jan 24, 2017 5:29 PM, "Sirisha Cheruvu"  wrote:

> Hi Team,
>
> I am trying to keep below code in get method and calling that get mthod in
> another hive UDF
> and running the hive UDF using Hive Context.sql procedure..
>
>
> switch (f) {
> case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((LongWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
> default  :  return obj;
>   }
> }
>
> Suprisingly only LongWritable and Text convrsions are throwing error but
> DoubleWritable is working
> So I tried changing below code to
>
> switch (f) {
> case "double" :  return ((DoubleWritable)obj).get();
> case "bigint" :  return ((DoubleWritable)obj).get();
> case "string" :  return ((Text)obj).toString();
> default  :  return obj;
>   }
> }
>
> Still its throws error saying Java.Lang.Long cant be convrted
> to org.apache.hadoop.hive.serde2.io.DoubleWritable
>
>
>
> its working fine on hive but throwing error on spark-sql
>
> I am importing the below packages.
> import java.util.*;
> import org.apache.hadoop.hive.serde2.objectinspector.*;
> import org.apache.hadoop.io.LongWritable;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.hive.serde2.io.DoubleWritable;
>
> .Please let me know why it is making issue in spark when perfectly running
> fine on hive
>


Re: Help in generating unique Id in spark row

2017-01-05 Thread Olivier Girardot
There is a way, you can use
org.apache.spark.sql.functions.monotonicallyIncreasingId it will give each rows
of your dataframe a unique Id
 





On Tue, Oct 18, 2016 10:36 AM, ayan guha guha.a...@gmail.com
wrote:
Do you have any primary key or unique identifier in your data? Even if multiple
columns can make a composite key? In other words, can your data have exactly
same 2 rows with different unique ID? Also, do you have to have numeric ID? 

You may want to pursue hashing algorithm such as sha group to convert single or
composite unique columns to an ID. 

On 18 Oct 2016 15:32, "Saurav Sinha"  wrote:
Can any one help me out
On Mon, Oct 17, 2016 at 7:27 PM, Saurav Sinha   wrote:
Hi,
I am in situation where I want to generate unique Id for each row.
I have use monotonicallyIncreasingId but it is giving increasing values and
start generating from start if it fail.
I have two question here:
Q1. Does this method give me unique id even in failure situation becaue I want
to use that ID in my solr id.
Q2. If answer to previous question is NO. Then Is there way yo generate UUID for
each row which is uniqe and not updatedable.
As I have come up with situation where UUID is updated

val idUDF = udf(() => UUID.randomUUID().toString)
val a = withColumn("alarmUUID", lit(idUDF()))a.persist(StorageLevel.MEMORY_
AND_DISK)
rawDataDf.registerTempTable("rawAlarms")

// I do some joines
but as I reach further below
I do sonthing likeb is transformation of asqlContext.sql("""Select
a.alarmUUID,b.alarmUUID                      from a right outer join bon
a.alarmUUID = b.alarmUUID""")
it give output as
+++|           alarmUUID|          
alarmUUID|+++|7d33a516-5532-410...|    
           null||                null|2439d6db-16a2-44b...|
+++


-- 
Thanks and Regards,
Saurav Sinha
Contact: 9742879062


-- 
Thanks and Regards,
Saurav Sinha
Contact: 9742879062

 

Olivier Girardot| Associé
o.girar...@lateral-thoughts.com
+33 6 24 09 17 94

RE: Help needed in parsing JSon with nested structures

2016-10-31 Thread Jan Botorek
Hello,
>From my point of view, it would be more efficient and probably i more 
>"readible" if you just extracted the required data using some json parsing 
>library (GSON, Jackson), construct some global object (or pre-process data), 
>and then begin with the Spark operations.

Jan

From: Kappaganthu, Sivaram (ES) [mailto:sivaram.kappagan...@adp.com]
Sent: Monday, October 31, 2016 11:50 AM
To: user@spark.apache.org
Subject: Help needed in parsing JSon with nested structures

Hello All,



I am processing a nested complex Json and below is the schema for it.
root
|-- businessEntity: array (nullable = true)
||-- element: struct (containsNull = true)
|||-- payGroup: array (nullable = true)
||||-- element: struct (containsNull = true)
|||||-- reportingPeriod: struct (nullable = true)
||||||-- worker: array (nullable = true)
|||||||-- element: struct (containsNull = true)
||||||||-- category: string (nullable = true)
||||||||-- person: struct (nullable = true)
||||||||-- tax: array (nullable = true)
|||||||||-- element: struct (containsNull = 
true)
||||||||||-- code: string (nullable = true)
||||||||||-- qtdAmount: double (nullable = 
true)
||||||||||-- ytdAmount: double (nullable =
My requirement is to create a hashmap with code concatenated with qtdAmount as 
key and value of qtdAmount as value. Map.put(code + "qtdAmount" , qtdAmount). 
How can i do this with spark.
I tried with below shell commands.
import org.apache.spark.sql._
val sqlcontext = new SQLContext(sc)
val cdm = sqlcontext.read.json("/user/edureka/CDM/cdm.json")
val spark = 
SparkSession.builder().appName("SQL").config("spark.some.config.option","some-vale").getOrCreate()
cdm.createOrReplaceTempView("CDM")
val sqlDF = spark.sql("SELECT businessEntity[0].payGroup[0] from CDM").show()
val address = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].person.address from CDM 
as address")
val worker = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker from CDM")
val tax = spark.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val tax = sqlcontext.sql("SELECT 
businessEntity[0].payGroup[0].reportingPeriod.worker[0].tax from CDM")
val codes = tax.select(expode(tax("code"))
scala> val codes = 
tax.withColumn("code",explode(tax("tax.code"))).withColumn("qtdAmount",explode(tax("tax.qtdAmount"))).withColumn("ytdAmount",explode(tax("tax.ytdAmount")))


i am trying to get all the codes and qtdAmount into a map. But i am not getting 
it. Using multiple explode statements for a single DF, is producing Cartesian 
product of the elements.
Could someone please help on how to parse the json of this much complex in 
spark.


Thanks,
Sivaram


This message and any attachments are intended only for the use of the addressee 
and may contain information that is privileged and confidential. If the reader 
of the message is not the intended recipient or an authorized representative of 
the intended recipient, you are hereby notified that any dissemination of this 
communication is strictly prohibited. If you have received this communication 
in error, notify the sender immediately by return email and delete the message 
and any attachments from your system.


Re: Help in generating unique Id in spark row

2016-10-18 Thread ayan guha
Do you have any primary key or unique identifier in your data? Even if
multiple columns can make a composite key? In other words, can your data
have exactly same 2 rows with different unique ID? Also, do you have to
have numeric ID?

You may want to pursue hashing algorithm such as sha group to convert
single or composite unique columns to an ID.
On 18 Oct 2016 15:32, "Saurav Sinha"  wrote:

> Can any one help me out
>
> On Mon, Oct 17, 2016 at 7:27 PM, Saurav Sinha 
> wrote:
>
>> Hi,
>>
>> I am in situation where I want to generate unique Id for each row.
>>
>> I have use monotonicallyIncreasingId but it is giving increasing values
>> and start generating from start if it fail.
>>
>> I have two question here:
>>
>> Q1. Does this method give me unique id even in failure situation becaue I
>> want to use that ID in my solr id.
>>
>> Q2. If answer to previous question is NO. Then Is there way yo generate
>> UUID for each row which is uniqe and not updatedable.
>>
>> As I have come up with situation where UUID is updated
>>
>>
>> val idUDF = udf(() => UUID.randomUUID().toString)
>> val a = withColumn("alarmUUID", lit(idUDF()))
>> a.persist(StorageLevel.MEMORY_AND_DISK)
>> rawDataDf.registerTempTable("rawAlarms")
>>
>> ///
>> /// I do some joines
>>
>> but as I reach further below
>>
>> I do sonthing like
>> b is transformation of a
>> sqlContext.sql("""Select a.alarmUUID,b.alarmUUID
>>   from a right outer join b on a.alarmUUID =
>> b.alarmUUID""")
>>
>> it give output as
>>
>> +++
>>
>> |   alarmUUID|   alarmUUID|
>> +++
>> |7d33a516-5532-410...|null|
>> |null|2439d6db-16a2-44b...|
>> +++
>>
>>
>>
>> --
>> Thanks and Regards,
>>
>> Saurav Sinha
>>
>> Contact: 9742879062
>>
>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>


Re: Help in generating unique Id in spark row

2016-10-17 Thread Saurav Sinha
Can any one help me out

On Mon, Oct 17, 2016 at 7:27 PM, Saurav Sinha 
wrote:

> Hi,
>
> I am in situation where I want to generate unique Id for each row.
>
> I have use monotonicallyIncreasingId but it is giving increasing values
> and start generating from start if it fail.
>
> I have two question here:
>
> Q1. Does this method give me unique id even in failure situation becaue I
> want to use that ID in my solr id.
>
> Q2. If answer to previous question is NO. Then Is there way yo generate
> UUID for each row which is uniqe and not updatedable.
>
> As I have come up with situation where UUID is updated
>
>
> val idUDF = udf(() => UUID.randomUUID().toString)
> val a = withColumn("alarmUUID", lit(idUDF()))
> a.persist(StorageLevel.MEMORY_AND_DISK)
> rawDataDf.registerTempTable("rawAlarms")
>
> ///
> /// I do some joines
>
> but as I reach further below
>
> I do sonthing like
> b is transformation of a
> sqlContext.sql("""Select a.alarmUUID,b.alarmUUID
>   from a right outer join b on a.alarmUUID =
> b.alarmUUID""")
>
> it give output as
>
> +++
>
> |   alarmUUID|   alarmUUID|
> +++
> |7d33a516-5532-410...|null|
> |null|2439d6db-16a2-44b...|
> +++
>
>
>
> --
> Thanks and Regards,
>
> Saurav Sinha
>
> Contact: 9742879062
>



-- 
Thanks and Regards,

Saurav Sinha

Contact: 9742879062


Re: Help with Jupyter Notebook Settup on CDH using Anaconda

2016-09-03 Thread Marco Mistroni
Hi
  please paste the exception
for Spark vs Jupyter, you might want to sign up for  this.
It'll give you jupyter  and spark...and presumably the spark-csv is already
part of it ?

https://community.cloud.databricks.com/login.html

hth
 marco



On Sat, Sep 3, 2016 at 8:10 PM, Arif,Mubaraka  wrote:

> On the on-premise *Cloudera Hadoop 5.7.2* I have installed the anaconda
> package and trying to *setup Jupyter notebook *to work with spark1.6.
>
>
>
> I have ran into problems when I trying to use the package
> *com.databricks:spark-csv_2.10:1.4.0* for *reading and inferring the
> schema of the csv file using python spark*.
>
>
>
> I have installed the* jar file - spark-csv_2.10-1.4.0.jar *in
> */var/opt/teradata/cloudera/parcels/CDH-5.7.2-1.cdh5.7.2.p0.18/jar* and c
> *onfigurations* are set as  :
>
>
>
> export PYSPARK_DRIVER_PYTHON=/var/opt/teradata/cloudera/parcels/
> Anaconda-4.0.0/bin/jupyter
> export PYSPARK_DRIVER_PYTHON_OPTS="notebook --NotebookApp.open_browser=False
> --NotebookApp.ip='*' --NotebookApp.port=8083"
> export PYSPARK_PYTHON=/var/opt/teradata/cloudera/parcels/
> Anaconda-4.0.0/bin/python
>
>
>
> When I run pyspark from the command line with packages option, like :
>
>
>
> *$pyspark --packages com.databricks:spark-csv_2.10:1.4.0 *
>
>
>
> It throws the error and fails to recognize the added dependency.
>
>
>
> Any ideas on how to resolve this error is much appreciated.
>
>
>
> Also, any ideas on the experience in installing and running Jupyter
> notebook with anaconda and spark please share.
>
>
>
> thanks,
>
> Muby
>
>
>
>
> - To
> unsubscribe e-mail: user-unsubscr...@spark.apache.org


Re: Help testing the Spark Extensions for the Apache Bahir 2.0.0 release

2016-08-07 Thread Luciano Resende
Simple, just help us test the available extensions using Spark 2.0.0...
preferable in real workloads that you might be using in your day to day
usage of Spark.

I wrote a quick getting started for using the new MQTT Structured Streaming
on my blog, which can serve as an example:

http://lresende.blogspot.gr/2016/08/getting-started-with-apache-bahir-mqtt_4.html


On Sun, Aug 7, 2016 at 11:24 AM, Sivakumaran S  wrote:

> Hi,
>
> How can I help?
>
> regards,
>
> Sivakumaran S
>
> On 06-Aug-2016, at 6:18 PM, Luciano Resende  wrote:
>
> Apache Bahir is voting it's 2.0.0 release based on Apache Spark 2.0.0.
>
> https://www.mail-archive.com/dev@bahir.apache.org/msg00312.html
>
> We appreciate any help reviewing/testing the release, which contains the
> following Apache Spark extensions:
>
> Akka DStream connector
> MQTT DStream connector
> Twitter DStream connector
> ZeroMQ DStream connector
>
> MQTT Structured Streaming
>
> Thanks in advance
>
> --
> Luciano Resende
> http://twitter.com/lresende1975
> http://lresende.blogspot.com/
>
>
>


-- 
Luciano Resende
http://twitter.com/lresende1975
http://lresende.blogspot.com/


Re: Help testing the Spark Extensions for the Apache Bahir 2.0.0 release

2016-08-07 Thread Sivakumaran S
Hi,

How can I help? 

regards,

Sivakumaran S
> On 06-Aug-2016, at 6:18 PM, Luciano Resende  wrote:
> 
> Apache Bahir is voting it's 2.0.0 release based on Apache Spark 2.0.0. 
> 
> https://www.mail-archive.com/dev@bahir.apache.org/msg00312.html 
> 
> 
> We appreciate any help reviewing/testing the release, which contains the 
> following Apache Spark extensions:
> 
> Akka DStream connector
> MQTT DStream connector
> Twitter DStream connector
> ZeroMQ DStream connector
> 
> MQTT Structured Streaming
> 
> Thanks in advance
> 
> -- 
> Luciano Resende
> http://twitter.com/lresende1975 
> http://lresende.blogspot.com/ 


Re: [HELP:]Save Spark Dataframe in Phoenix Table

2016-04-08 Thread Josh Mahonin
Hi Divya,

That's strange. Are you able to post a snippet of your code to look at? And
are you sure that you're saving the dataframes as per the docs (
https://phoenix.apache.org/phoenix_spark.html)?

Depending on your HDP version, it may or may not actually have
phoenix-spark support. Double-check that your Spark configuration is setup
with the right worker/driver classpath settings. and that the phoenix JARs
contain the necessary phoenix-spark classes
(e.g. org.apache.phoenix.spark.PhoenixRelation). If not, I suggest
following up with Hortonworks.

Josh



On Fri, Apr 8, 2016 at 1:22 AM, Divya Gehlot 
wrote:

> Hi,
> I hava a Hortonworks Hadoop cluster having below Configurations :
> Spark 1.5.2
> HBASE 1.1.x
> Phoenix 4.4
>
> I am able to connect to Phoenix through JDBC connection and able to read
> the Phoenix tables .
> But while writing the data back to Phoenix table
> I am getting below error :
>
> org.apache.spark.sql.AnalysisException:
> org.apache.phoenix.spark.DefaultSource does not allow user-specified
> schemas.;
>
> Can any body help in resolving the above errors or any other solution of
> saving Spark Dataframes to Phoenix.
>
> Would really appareciate the help.
>
> Thanks,
> Divya
>


Re: help coercing types

2016-03-19 Thread Jacek Laskowski
Hi,

Just a side question: why do you convert DataFrame to RDD? It's like
driving backwards (possible but ineffective and dangerous at times)

P. S. I'd even go for Dataset.

Jacek
18.03.2016 5:20 PM "Bauer, Robert"  napisał(a):

> I have data that I pull in using a sql context and then I convert to an
> rdd.
>
>
>
> The problem is that the type in the rdd is [Any, Iterable[Any]]
>
>
>
> And I need to have the type RDD[Array[String]]   -- convert the Iterable
> to an Array.
>
>
>
> Here’s more detail:
>
>
>
> val zdata = sqlContext.read.parquet("s3://.. parquet").select('Pk,
> explode('Pg) as "P").select($"Pk", $"P.A.n")
>
>
>
> val r1data = zdata.rdd
>
>
>
> val r2data = r1data.map(t => (t(0),t(1))).groupByKey()
>
>
>
> and at this point r2data’s type is [Any, Iterable[Any]]
>
>
>
> robert
>
>
>
> --
>
> This message (including any attachments) contains confidential and/or
> privileged information. It is intended for a specific individual and
> purpose and is protected by law. If you are not the intended recipient,
> please notify the sender immediately and delete this message. Any
> disclosure, copying, or distribution of this message, or the taking of any
> action based on it, is strictly prohibited.
>


Re: [Help]: DataframeNAfunction fill method throwing exception

2016-03-01 Thread ai he
Hi Divya,

I guess the error is thrown from spark-csv. Spark-csv tries to parse string
"null" to double.

The workaround is to add nullValue option, like .option("nullValue",
"null"). But this nullValue feature is not included in current spark-csv
1.3. Just checkout the master of spark-csv and use the local ivy to make it
work.

Best,
Ai

On Thu, Feb 25, 2016 at 11:34 PM Divya Gehlot 
wrote:

> Hi Jan ,
> Thanks for help.
> Alas..
> you suggestion also didnt work
>
> scala> import org.apache.spark.sql.types.{StructType, StructField,
>> StringType,IntegerType,LongType,DoubleType, FloatType};
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType, LongType, DoubleType, FloatType}
>> scala> val nulltestSchema = StructType(Seq(StructField("name",
>> StringType, false),StructField("age", DoubleType, true)))
>> nulltestSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(name,StringType,false),
>> StructField(age,DoubleType,true))
>>
> scala> val dfnulltest =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").schema(nulltestSchema).load("hdfs://xx.xx.xx.xxx:8020/TestDivya/Spark/nulltest.csv")
>
>
>> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>
> scala> dfnulltest.selectExpr("name", "coalesce(age, 0) as age")
>> res0: org.apache.spark.sql.DataFrame = [name: string, age: double]
>> scala> val dfresult = dfnulltest.selectExpr("name", "coalesce(age, 0) as
>> age")
>> dfresult: org.apache.spark.sql.DataFrame = [name: string, age: double]
>> scala> dfresult.show
>
>
>  java.text.ParseException: Unparseable number: "null"
> at java.text.NumberFormat.parse(NumberFormat.java:350)
>
>
> On 26 February 2016 at 15:15, Jan Štěrba  wrote:
>
>> just use coalesce function
>>
>> df.selectExpr("name", "coalesce(age, 0) as age")
>>
>> --
>> Jan Sterba
>> https://twitter.com/honzasterba | http://flickr.com/honzasterba |
>> http://500px.com/honzasterba
>>
>> On Fri, Feb 26, 2016 at 5:27 AM, Divya Gehlot 
>> wrote:
>>
>>> Hi,
>>> I have dataset which looks like below
>>> name age
>>> alice 35
>>> bob null
>>> peter 24
>>> I need to replace null values of columns with 0
>>> so  I referred Spark API DataframeNAfunctions.scala
>>> 
>>>
>>>  I tried the below code its throwing exception
>>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>>> StringType,IntegerType,LongType,DoubleType, FloatType};
>>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>>> IntegerType, LongType, DoubleType, FloatType}
>>>
>>> scala> val nulltestSchema = StructType(Seq(StructField("name",
>>> StringType, false),StructField("age", DoubleType, true)))
>>> nulltestSchema: org.apache.spark.sql.types.StructType =
>>> StructType(StructField(name,StringType,false),
>>> StructField(age,DoubleType,true))
>>>
>>> scala> val dfnulltest =
>>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>>> "true").schema(nulltestSchema).load("hdfs://
>>> 172.31.29.201:8020/TestDivya/Spark/nulltest.csv")
>>> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>>
>>> scala> val dfchangenull =
>>> dfnulltest.na.fill(0,Seq("age")).select("name","age")
>>> dfchangenull: org.apache.spark.sql.DataFrame = [name: string, age:
>>> double]
>>>
>>> scala> dfchangenull.show
>>> 16/02/25 23:15:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID
>>> 2, ip-172-31-22-135.ap-southeast-1.compute.internal):
>>> java.text.ParseException: Unparseable number: "null"
>>> at java.text.NumberFormat.parse(NumberFormat.java:350)
>>>
>>>
>>
>>
>


Re: [Help]: DataframeNAfunction fill method throwing exception

2016-02-25 Thread Divya Gehlot
Hi Jan ,
Thanks for help.
Alas..
you suggestion also didnt work

scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType,IntegerType,LongType,DoubleType, FloatType};
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, LongType, DoubleType, FloatType}
> scala> val nulltestSchema = StructType(Seq(StructField("name", StringType,
> false),StructField("age", DoubleType, true)))
> nulltestSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(name,StringType,false),
> StructField(age,DoubleType,true))
> scala> val dfnulltest =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(nulltestSchema).load("hdfs://xx.xx.xx.xxx:8020/TestDivya/Spark/nulltest.csv")
> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
> scala> dfnulltest.selectExpr("name", "coalesce(age, 0) as age")
> res0: org.apache.spark.sql.DataFrame = [name: string, age: double]
> scala> val dfresult = dfnulltest.selectExpr("name", "coalesce(age, 0) as
> age")
> dfresult: org.apache.spark.sql.DataFrame = [name: string, age: double]
> scala> dfresult.show


 java.text.ParseException: Unparseable number: "null"
at java.text.NumberFormat.parse(NumberFormat.java:350)


On 26 February 2016 at 15:15, Jan Štěrba  wrote:

> just use coalesce function
>
> df.selectExpr("name", "coalesce(age, 0) as age")
>
> --
> Jan Sterba
> https://twitter.com/honzasterba | http://flickr.com/honzasterba |
> http://500px.com/honzasterba
>
> On Fri, Feb 26, 2016 at 5:27 AM, Divya Gehlot 
> wrote:
>
>> Hi,
>> I have dataset which looks like below
>> name age
>> alice 35
>> bob null
>> peter 24
>> I need to replace null values of columns with 0
>> so  I referred Spark API DataframeNAfunctions.scala
>> 
>>
>>  I tried the below code its throwing exception
>> scala> import org.apache.spark.sql.types.{StructType, StructField,
>> StringType,IntegerType,LongType,DoubleType, FloatType};
>> import org.apache.spark.sql.types.{StructType, StructField, StringType,
>> IntegerType, LongType, DoubleType, FloatType}
>>
>> scala> val nulltestSchema = StructType(Seq(StructField("name",
>> StringType, false),StructField("age", DoubleType, true)))
>> nulltestSchema: org.apache.spark.sql.types.StructType =
>> StructType(StructField(name,StringType,false),
>> StructField(age,DoubleType,true))
>>
>> scala> val dfnulltest =
>> sqlContext.read.format("com.databricks.spark.csv").option("header",
>> "true").schema(nulltestSchema).load("hdfs://
>> 172.31.29.201:8020/TestDivya/Spark/nulltest.csv")
>> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>
>> scala> val dfchangenull =
>> dfnulltest.na.fill(0,Seq("age")).select("name","age")
>> dfchangenull: org.apache.spark.sql.DataFrame = [name: string, age: double]
>>
>> scala> dfchangenull.show
>> 16/02/25 23:15:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
>> ip-172-31-22-135.ap-southeast-1.compute.internal):
>> java.text.ParseException: Unparseable number: "null"
>> at java.text.NumberFormat.parse(NumberFormat.java:350)
>>
>>
>
>


Re: [Help]: DataframeNAfunction fill method throwing exception

2016-02-25 Thread Jan Štěrba
just use coalesce function

df.selectExpr("name", "coalesce(age, 0) as age")

--
Jan Sterba
https://twitter.com/honzasterba | http://flickr.com/honzasterba |
http://500px.com/honzasterba

On Fri, Feb 26, 2016 at 5:27 AM, Divya Gehlot 
wrote:

> Hi,
> I have dataset which looks like below
> name age
> alice 35
> bob null
> peter 24
> I need to replace null values of columns with 0
> so  I referred Spark API DataframeNAfunctions.scala
> 
>
>  I tried the below code its throwing exception
> scala> import org.apache.spark.sql.types.{StructType, StructField,
> StringType,IntegerType,LongType,DoubleType, FloatType};
> import org.apache.spark.sql.types.{StructType, StructField, StringType,
> IntegerType, LongType, DoubleType, FloatType}
>
> scala> val nulltestSchema = StructType(Seq(StructField("name", StringType,
> false),StructField("age", DoubleType, true)))
> nulltestSchema: org.apache.spark.sql.types.StructType =
> StructType(StructField(name,StringType,false),
> StructField(age,DoubleType,true))
>
> scala> val dfnulltest =
> sqlContext.read.format("com.databricks.spark.csv").option("header",
> "true").schema(nulltestSchema).load("hdfs://
> 172.31.29.201:8020/TestDivya/Spark/nulltest.csv")
> dfnulltest: org.apache.spark.sql.DataFrame = [name: string, age: double]
>
> scala> val dfchangenull =
> dfnulltest.na.fill(0,Seq("age")).select("name","age")
> dfchangenull: org.apache.spark.sql.DataFrame = [name: string, age: double]
>
> scala> dfchangenull.show
> 16/02/25 23:15:59 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
> ip-172-31-22-135.ap-southeast-1.compute.internal):
> java.text.ParseException: Unparseable number: "null"
> at java.text.NumberFormat.parse(NumberFormat.java:350)
>
>


Re: Help needed in deleting a message posted in Spark User List

2016-02-06 Thread Corey Nolet
The whole purpose of Apache mailing lists is that the messages get indexed
all over the web so that discussions and questions/solutions can be
searched easily by google and other engines.

For this reason, and the messages being sent via email as Steve pointed
out, it's just not possible to retract the messages.

On Sat, Feb 6, 2016 at 10:21 AM, Steve Loughran 
wrote:

>
> > On 5 Feb 2016, at 17:35, Marcelo Vanzin  wrote:
> >
> > You don't... just send a new one.
> >
> > On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
> >  wrote:
> >> Hi,
> >>
> >> I want to edit/delete a message posted in Spark User List. How do I do
> that?
> >>
> >> Thanks!
> >
> >
> >
>
> it isn't technically possible
>
> http://apache.org/foundation/public-archives.html
>
> People do occasionally ask on the infrastructure mailing list to do do
> this, but they aren't in a position to do anything about the copies that
> end up in the mailboxes of every subscriber.
>
> Don't worry about it; we've all done things like post internal stack
> traces, accidentally mail the wrong list, etc, etc.
>
> Now, accidentally breaking the nightly build of everything, that's
> somewhat embarrassing —but you haven't done that and it's been ~4 months
> since I've done that myself.
>
>
> -Steve


Re: Help needed in deleting a message posted in Spark User List

2016-02-06 Thread Steve Loughran

> On 5 Feb 2016, at 17:35, Marcelo Vanzin  wrote:
> 
> You don't... just send a new one.
> 
> On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
>  wrote:
>> Hi,
>> 
>> I want to edit/delete a message posted in Spark User List. How do I do that?
>> 
>> Thanks!
> 
> 
> 

it isn't technically possible

http://apache.org/foundation/public-archives.html

People do occasionally ask on the infrastructure mailing list to do do this, 
but they aren't in a position to do anything about the copies that end up in 
the mailboxes of every subscriber.

Don't worry about it; we've all done things like post internal stack traces, 
accidentally mail the wrong list, etc, etc. 

Now, accidentally breaking the nightly build of everything, that's somewhat 
embarrassing —but you haven't done that and it's been ~4 months since I've done 
that myself.


-Steve

Re: Help needed in deleting a message posted in Spark User List

2016-02-05 Thread Marcelo Vanzin
You don't... just send a new one.

On Fri, Feb 5, 2016 at 9:33 AM, swetha kasireddy
 wrote:
> Hi,
>
> I want to edit/delete a message posted in Spark User List. How do I do that?
>
> Thanks!



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help me! Spark WebUI is corrupted!

2015-12-31 Thread Aniket Bhatnagar
Are you running on YARN or standalone?

On Thu, Dec 31, 2015, 3:35 PM LinChen  wrote:

> *Screenshot1(Normal WebUI)*
>
>
>
> *Screenshot2(Corrupted WebUI)*
>
>
>
> As screenshot2 shows, the format of my Spark WebUI looks strange and I
> cannot click the description of active jobs. It seems there is something
> missing in my opearing system. I googled it but find nothing. Could anybody
> help me?
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org


Re: Help: Driver OOM when shuffle large amount of data

2015-12-28 Thread Eugene Morozov
Kendal,

have you tried to reduce number of partitions?

--
Be well!
Jean Morozov

On Mon, Dec 28, 2015 at 9:02 AM, kendal  wrote:

> My driver is running OOM with my 4T data set... I don't collect any data to
> driver. All what the program done is map - reduce - saveAsTextFile. But the
> partitions to be shuffled is quite large - 20K+.
>
> The symptom what I'm seeing the timeout when GetMapOutputStatuses from
> Driver.
> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Don't have map outputs
> for shuffle 0, fetching them
> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Doing the fetch;
> tracker endpoint =
> AkkaRpcEndpointRef(Actor[akka.tcp://
> sparkDriver@10.115.58.55:52077/user/MapOutputTracker#-1937024516])
> 15/12/24 02:06:21 WARN akka.AkkaRpcEndpointRef: Error sending message
> [message = GetMapOutputStatuses(0)] in 1 attempts
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
> seconds]. This timeout is controlled by spark.rpc.askTimeout
> at
> org.apache.spark.rpc.RpcTimeout.org
> $apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
> at
>
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
> at
>
> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
> at
>
> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
>
> But the root cause is OOM:
> 15/12/24 02:05:36 ERROR actor.ActorSystemImpl: Uncaught fatal error from
> thread [sparkDriver-akka.remote.default-remote-dispatcher-24] shutting down
> ActorSystem [sparkDriver]
> java.lang.OutOfMemoryError: Java heap space
> at java.util.Arrays.copyOf(Arrays.java:2271)
> at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:131)
> at
> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
> at
>
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
> at
>
> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
> at
> akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:718)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>
> I've already allocated 16G memory for my driver - which is the hard limit
> MAX of my Yarn cluster. And I also applied Kryo serialization... Any idea
> to
> reduce memory foot point?
> And what confuses me is that, even I have 20K+ partition to shuffle, why I
> need so much memory?!
>
> Thank you so much for any help!
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Help-Driver-OOM-when-shuffle-large-amount-of-data-tp25818.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Help: Driver OOM when shuffle large amount of data

2015-12-28 Thread Chris Fregly
which version of spark is this?

is there any chance that a single key - or set of keys- key has a large number 
of values relative to the other keys (aka. skew)?

if so, spark 1.5 *should* fix this issue with the new tungsten stuff, although 
I had some issues still with 1.5.1 in a similar situation.

I'm waiting to test with 1.6.0 before I start asking/creating jiras.

> On Dec 28, 2015, at 5:23 AM, Eugene Morozov  
> wrote:
> 
> Kendal, 
> 
> have you tried to reduce number of partitions?
> 
> --
> Be well!
> Jean Morozov
> 
>> On Mon, Dec 28, 2015 at 9:02 AM, kendal  wrote:
>> My driver is running OOM with my 4T data set... I don't collect any data to
>> driver. All what the program done is map - reduce - saveAsTextFile. But the
>> partitions to be shuffled is quite large - 20K+.
>> 
>> The symptom what I'm seeing the timeout when GetMapOutputStatuses from
>> Driver.
>> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Don't have map outputs
>> for shuffle 0, fetching them
>> 15/12/24 02:04:21 INFO spark.MapOutputTrackerWorker: Doing the fetch;
>> tracker endpoint =
>> AkkaRpcEndpointRef(Actor[akka.tcp://sparkDriver@10.115.58.55:52077/user/MapOutputTracker#-1937024516])
>> 15/12/24 02:06:21 WARN akka.AkkaRpcEndpointRef: Error sending message
>> [message = GetMapOutputStatuses(0)] in 1 attempts
>> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120
>> seconds]. This timeout is controlled by spark.rpc.askTimeout
>> at
>> org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcEnv.scala:214)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:229)
>> at
>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcEnv.scala:225)
>> at
>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>> at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcEnv.scala:242)
>> 
>> But the root cause is OOM:
>> 15/12/24 02:05:36 ERROR actor.ActorSystemImpl: Uncaught fatal error from
>> thread [sparkDriver-akka.remote.default-remote-dispatcher-24] shutting down
>> ActorSystem [sparkDriver]
>> java.lang.OutOfMemoryError: Java heap space
>> at java.util.Arrays.copyOf(Arrays.java:2271)
>> at
>> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:178)
>> at akka.serialization.JavaSerializer.toBinary(Serializer.scala:131)
>> at
>> akka.remote.MessageSerializer$.serialize(MessageSerializer.scala:36)
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>> at
>> akka.remote.EndpointWriter$$anonfun$serializeMessage$1.apply(Endpoint.scala:843)
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
>> at akka.remote.EndpointWriter.serializeMessage(Endpoint.scala:842)
>> at akka.remote.EndpointWriter.writeSend(Endpoint.scala:743)
>> at
>> akka.remote.EndpointWriter$$anonfun$4.applyOrElse(Endpoint.scala:718)
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:467)
>> at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:411)
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>> 
>> I've already allocated 16G memory for my driver - which is the hard limit
>> MAX of my Yarn cluster. And I also applied Kryo serialization... Any idea to
>> reduce memory foot point?
>> And what confuses me is that, even I have 20K+ partition to shuffle, why I
>> need so much memory?!
>> 
>> Thank you so much for any help!
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Help-Driver-OOM-when-shuffle-large-amount-of-data-tp25818.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-14 Thread Jakob Odersky
> Sorry,I'm late.I try again and again ,now I use spark 1.4.0 ,hadoop
2.4.1.but I also find something strange like this :

>
http://apache-spark-user-list.1001560.n3.nabble.com/worker-java-lang-ClassNotFoundException-ttt-test-anonfun-1-td25696.html
> (if i use "textFile",It can't run.)

In the link you sent, there is still an `addJar(spark-assembly-hadoop-xx)`,
can you try running your application with that?

On 11 December 2015 at 03:08, Bonsen  wrote:

> Thank you,and I find the problem is my package is test,but I write package
> org.apache.spark.examples ,and IDEA had imported the
> spark-examples-1.5.2-hadoop2.6.0.jar ,so I can run it,and it makes lots of
> problems
> __
> Now , I change the package like this:
>
> package test
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> object test {
>   def main(args: Array[String]) {
> val conf = new
> SparkConf().setAppName("mytest").setMaster("spark://Master:7077")
> val sc = new SparkContext(conf)
> sc.addJar("/home/hadoop/spark-assembly-1.5.2-hadoop2.6.0.jar")//It
> doesn't work.!?
> val rawData = sc.textFile("/home/hadoop/123.csv")
> val secondData = rawData.flatMap(_.split(",").toString)
> println(secondData.first)   /line 32
> sc.stop()
>   }
> }
> it causes that:
> 15/12/11 18:41:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 219.216.65.129): java.lang.ClassNotFoundException: test.test$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> 
> 
> //  219.216.65.129 is my worker computer.
> //  I can connect to my worker computer.
> // Spark can start successfully.
> //  addFile is also doesn't work,the tmp file will also dismiss.
>
>
>
>
>
>
> At 2015-12-10 22:32:21, "Himanshu Mehra [via Apache Spark User List]" <[hidden
> email] > wrote:
>
> You are trying to print an array, but anyway it will print the objectID
>  of the array if the input is same as you have shown here. Try flatMap()
> instead of map and check if the problem is same.
>
>--Himanshu
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25667.html
> To unsubscribe from HELP! I get "java.lang.String cannot be cast to
> java.lang.Intege " for a long time., click here.
> NAML
> 
>
>
>
>
>
> --
> View this message in context: Re:Re: HELP! I get "java.lang.String cannot
> be cast to java.lang.Intege " for a long time.
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-14 Thread Jakob Odersky
sorry typo, I meant *without* the addJar

On 14 December 2015 at 11:13, Jakob Odersky  wrote:

> > Sorry,I'm late.I try again and again ,now I use spark 1.4.0 ,hadoop
> 2.4.1.but I also find something strange like this :
>
> >
> http://apache-spark-user-list.1001560.n3.nabble.com/worker-java-lang-ClassNotFoundException-ttt-test-anonfun-1-td25696.html
> > (if i use "textFile",It can't run.)
>
> In the link you sent, there is still an
> `addJar(spark-assembly-hadoop-xx)`, can you try running your application
> with that?
>
> On 11 December 2015 at 03:08, Bonsen  wrote:
>
>> Thank you,and I find the problem is my package is test,but I write
>> package org.apache.spark.examples ,and IDEA had imported the
>> spark-examples-1.5.2-hadoop2.6.0.jar ,so I can run it,and it makes lots of
>> problems
>> __
>> Now , I change the package like this:
>>
>> package test
>> import org.apache.spark.SparkConf
>> import org.apache.spark.SparkContext
>> object test {
>>   def main(args: Array[String]) {
>> val conf = new
>> SparkConf().setAppName("mytest").setMaster("spark://Master:7077")
>> val sc = new SparkContext(conf)
>> sc.addJar("/home/hadoop/spark-assembly-1.5.2-hadoop2.6.0.jar")//It
>> doesn't work.!?
>> val rawData = sc.textFile("/home/hadoop/123.csv")
>> val secondData = rawData.flatMap(_.split(",").toString)
>> println(secondData.first)   /line 32
>> sc.stop()
>>   }
>> }
>> it causes that:
>> 15/12/11 18:41:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
>> 219.216.65.129): java.lang.ClassNotFoundException: test.test$$anonfun$1
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> 
>> 
>> //  219.216.65.129 is my worker computer.
>> //  I can connect to my worker computer.
>> // Spark can start successfully.
>> //  addFile is also doesn't work,the tmp file will also dismiss.
>>
>>
>>
>>
>>
>>
>> At 2015-12-10 22:32:21, "Himanshu Mehra [via Apache Spark User List]" 
>> <[hidden
>> email] > wrote:
>>
>> You are trying to print an array, but anyway it will print the objectID
>>  of the array if the input is same as you have shown here. Try flatMap()
>> instead of map and check if the problem is same.
>>
>>--Himanshu
>>
>> --
>> If you reply to this email, your message will be added to the discussion
>> below:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25667.html
>> To unsubscribe from HELP! I get "java.lang.String cannot be cast to
>> java.lang.Intege " for a long time., click here.
>> NAML
>> 
>>
>>
>>
>>
>>
>> --
>> View this message in context: Re:Re: HELP! I get "java.lang.String
>> cannot be cast to java.lang.Intege " for a long time.
>> 
>>
>> Sent from the Apache Spark User List mailing list archive
>>  at Nabble.com.
>>
>
>


Re: Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-11 Thread Jakob Odersky
It looks like you have an issue with your classpath, I think it is because
you add a jar containing Spark twice: first, you have a dependency on Spark
somewhere in your build tool (this allows you to compile and run your
application), second you re-add Spark here

>  sc.addJar("/home/hadoop/spark-assembly-1.5.2-hadoop2.6.0.jar")//It
doesn't work.!?

I recommend you remove that line and see if everything works.
If you have that line because you need hadoop 2.6, I recommend you build
spark against that version and publish locally with maven


Re: Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-11 Thread Jakob Odersky
Btw, Spark 1.5 comes with support for hadoop 2.2 by default

On 11 December 2015 at 03:08, Bonsen  wrote:

> Thank you,and I find the problem is my package is test,but I write package
> org.apache.spark.examples ,and IDEA had imported the
> spark-examples-1.5.2-hadoop2.6.0.jar ,so I can run it,and it makes lots of
> problems
> __
> Now , I change the package like this:
>
> package test
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> object test {
>   def main(args: Array[String]) {
> val conf = new
> SparkConf().setAppName("mytest").setMaster("spark://Master:7077")
> val sc = new SparkContext(conf)
> sc.addJar("/home/hadoop/spark-assembly-1.5.2-hadoop2.6.0.jar")//It
> doesn't work.!?
> val rawData = sc.textFile("/home/hadoop/123.csv")
> val secondData = rawData.flatMap(_.split(",").toString)
> println(secondData.first)   /line 32
> sc.stop()
>   }
> }
> it causes that:
> 15/12/11 18:41:06 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 219.216.65.129): java.lang.ClassNotFoundException: test.test$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
> 
> 
> //  219.216.65.129 is my worker computer.
> //  I can connect to my worker computer.
> // Spark can start successfully.
> //  addFile is also doesn't work,the tmp file will also dismiss.
>
>
>
>
>
>
> At 2015-12-10 22:32:21, "Himanshu Mehra [via Apache Spark User List]" <[hidden
> email] > wrote:
>
> You are trying to print an array, but anyway it will print the objectID
>  of the array if the input is same as you have shown here. Try flatMap()
> instead of map and check if the problem is same.
>
>--Himanshu
>
> --
> If you reply to this email, your message will be added to the discussion
> below:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25667.html
> To unsubscribe from HELP! I get "java.lang.String cannot be cast to
> java.lang.Intege " for a long time., click here.
> NAML
> 
>
>
>
>
>
> --
> View this message in context: Re:Re: HELP! I get "java.lang.String cannot
> be cast to java.lang.Intege " for a long time.
> 
>
> Sent from the Apache Spark User List mailing list archive
>  at Nabble.com.
>


Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-10 Thread Jakob Odersky
Could you provide some more context? What is rawData?

On 10 December 2015 at 06:38, Bonsen  wrote:

> I do like this "val secondData = rawData.flatMap(_.split("\t").take(3))"
>
> and I find:
> 15/12/10 22:36:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
> 219.216.65.129): java.lang.ClassCastException: java.lang.String cannot be
> cast to java.lang.Integer
> at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
> at
> org.apache.spark.examples.SparkPi$$anonfun$1.apply(SparkPi.scala:32)
> at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25668.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Help: Get Timeout error and FileNotFoundException when shuffling large files

2015-12-10 Thread Sudhanshu Janghel
Can you please paste the stack trace.

Sudhanshu


Re: Help: Get Timeout error and FileNotFoundException when shuffling large files

2015-12-10 Thread manasdebashiskar
Is that the only kind of error you are getting.
Is it possible something else dies that gets buried in other messages.
Try repairing HDFS (fsck etc) to find if blocks are intact.

Few things to check 
1) if you have too many small files.
2) Is your system complaining about too many inode etc..
3) Try smaller set while increasing the data set size to make sure it is
data volume related problem.
4) If you have monitoring turned on see what your driver, worker machines
cpu and disk io.
5) Have you tried increasing Driver memory(more partitions means driver
needs more memory to keep the metadata)

..Manas





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Help-Get-Timeout-error-and-FileNotFoundException-when-shuffling-large-files-tp25662p25675.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: HELP! I get "java.lang.String cannot be cast to java.lang.Intege " for a long time.

2015-12-10 Thread Bonsen
I do like this "val secondData = rawData.flatMap(_.split("\t").take(3))"

and I find:
15/12/10 22:36:55 WARN TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0,
219.216.65.129): java.lang.ClassCastException: java.lang.String cannot be
cast to java.lang.Integer
at scala.runtime.BoxesRunTime.unboxToInt(BoxesRunTime.java:106)
at org.apache.spark.examples.SparkPi$$anonfun$1.apply(SparkPi.scala:32)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HELP-I-get-java-lang-String-cannot-be-cast-to-java-lang-Intege-for-a-long-time-tp25666p25668.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Help with type check

2015-12-01 Thread Eyal Sharon
Great, That works perfect !!
Also tnx for the links - very helpful

On Tue, Dec 1, 2015 at 12:13 AM, Jakob Odersky  wrote:

> Hi Eyal,
>
> what you're seeing is not a Spark issue, it is related to boxed types.
>
> I assume 'b' in your code is some kind of java buffer, where b.getDouble()
> returns an instance of java.lang.Double and not a scala.Double. Hence
> muCouch is an Array[java.lang.Double], an array containing boxed doubles.
>
> To fix your problem, change 'yield b.getDouble(i)' to 'yield
> b.getDouble(i).doubleValue'
>
> You might want to have a look at these too:
> -
> http://stackoverflow.com/questions/23821576/efficient-conversion-of-java-util-listjava-lang-double-to-scala-listdouble
> - https://docs.oracle.com/javase/7/docs/api/java/lang/Double.html
> - http://www.scala-lang.org/api/current/index.html#scala.Double
>
> On 30 November 2015 at 10:13, Eyal Sharon  wrote:
>
>> Hi ,
>>
>> I have problem with inferring what are the types bug here
>>
>> I have this code fragment . it parse Json to Array[Double]
>>
>>
>>
>>
>>
>>
>> *val muCouch = {  val e = input.filter( _.id=="mu")(0).content()  val b  = 
>> e.getArray("feature_mean")  for (i <- 0 to e.getInt("features") ) yield 
>> b.getDouble(i)}.toArray*
>>
>> Now the problem is when I want to create a dense vector  :
>>
>> *new DenseVector(muCouch)*
>>
>>
>> I get the following error :
>>
>>
>> *Error:(111, 21) type mismatch;
>>  found   : Array[java.lang.Double]
>>  required: Array[scala.Double] *
>>
>>
>> Now , I probably get a workaround for that , but I want to get a deeper 
>> understanding  on why it occurs
>>
>> p.s - I do use collection.JavaConversions._
>>
>> Thanks !
>>
>>
>>
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. Please note that any disclosure, copying or distribution of the
>> content of this information is strictly forbidden. If you have received
>> this email message in error, please destroy it immediately and notify its
>> sender.*
>>
>
>

-- 


*This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are 
addressed. Please note that any disclosure, copying or distribution of the 
content of this information is strictly forbidden. If you have received 
this email message in error, please destroy it immediately and notify its 
sender.*


Re: Help with type check

2015-11-30 Thread Jakob Odersky
Hi Eyal,

what you're seeing is not a Spark issue, it is related to boxed types.

I assume 'b' in your code is some kind of java buffer, where b.getDouble()
returns an instance of java.lang.Double and not a scala.Double. Hence
muCouch is an Array[java.lang.Double], an array containing boxed doubles.

To fix your problem, change 'yield b.getDouble(i)' to 'yield
b.getDouble(i).doubleValue'

You might want to have a look at these too:
-
http://stackoverflow.com/questions/23821576/efficient-conversion-of-java-util-listjava-lang-double-to-scala-listdouble
- https://docs.oracle.com/javase/7/docs/api/java/lang/Double.html
- http://www.scala-lang.org/api/current/index.html#scala.Double

On 30 November 2015 at 10:13, Eyal Sharon  wrote:

> Hi ,
>
> I have problem with inferring what are the types bug here
>
> I have this code fragment . it parse Json to Array[Double]
>
>
>
>
>
>
> *val muCouch = {  val e = input.filter( _.id=="mu")(0).content()  val b  = 
> e.getArray("feature_mean")  for (i <- 0 to e.getInt("features") ) yield 
> b.getDouble(i)}.toArray*
>
> Now the problem is when I want to create a dense vector  :
>
> *new DenseVector(muCouch)*
>
>
> I get the following error :
>
>
> *Error:(111, 21) type mismatch;
>  found   : Array[java.lang.Double]
>  required: Array[scala.Double] *
>
>
> Now , I probably get a workaround for that , but I want to get a deeper 
> understanding  on why it occurs
>
> p.s - I do use collection.JavaConversions._
>
> Thanks !
>
>
>
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. Please note that any disclosure, copying or distribution of the
> content of this information is strictly forbidden. If you have received
> this email message in error, please destroy it immediately and notify its
> sender.*
>


Re: Help with Couchbase connector error

2015-11-29 Thread Eyal Sharon
Thanks guys , that was very helpful



On Thu, Nov 26, 2015 at 10:29 PM, Shixiong Zhu  wrote:

> Het Eyal, I just checked the couchbase spark connector jar. The target
> version of some of classes are Java 8 (52.0). You can create a ticket in
> https://issues.couchbase.com/projects/SPARKC
>
> Best Regards,
> Shixiong Zhu
>
> 2015-11-26 9:03 GMT-08:00 Ted Yu :
>
>> StoreMode is from Couchbase connector.
>>
>> Where did you obtain the connector ?
>>
>> See also
>> http://stackoverflow.com/questions/1096148/how-to-check-the-jdk-version-used-to-compile-a-class-file
>>
>> On Thu, Nov 26, 2015 at 8:55 AM, Eyal Sharon  wrote:
>>
>>> Hi ,
>>> Great , that gave some directions. But can you elaborate more?  or share
>>> some post
>>> I am currently running JDK 7 , and  my Couchbase too
>>>
>>> Thanks !
>>>
>>> On Thu, Nov 26, 2015 at 6:02 PM, Ted Yu  wrote:
>>>
 This implies version mismatch between the JDK used to build your jar
 and the one at runtime.

 When building, target JDK 1.7

 There're plenty of posts on the web for dealing with such error.

 Cheers

 On Thu, Nov 26, 2015 at 7:31 AM, Eyal Sharon  wrote:

> Hi,
>
> I am trying to set a connection to Couchbase. I am at the very
> beginning, and I got stuck on   this exception
>
> Exception in thread "main" java.lang.UnsupportedClassVersionError:
> com/couchbase/spark/StoreMode : Unsupported major.minor version 52.0
>
>
> Here is the simple code fragment
>
>   val sc = new SparkContext(cfg)
>
>   val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
> "content"))
>   val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
> "content", "in", "here"))
>
>
>   val data = sc
> .parallelize(Seq(doc1, doc2))
> .saveToCouchbase()
> }
>
>
> Any help will be a bless
>
>
> Thanks!
>
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. Please note that any disclosure, copying or distribution of the
> content of this information is strictly forbidden. If you have received
> this email message in error, please destroy it immediately and notify its
> sender.*
>


>>>
>>> *This email and any files transmitted with it are confidential and
>>> intended solely for the use of the individual or entity to whom they are
>>> addressed. Please note that any disclosure, copying or distribution of the
>>> content of this information is strictly forbidden. If you have received
>>> this email message in error, please destroy it immediately and notify its
>>> sender.*
>>>
>>
>>
>

-- 


*This email and any files transmitted with it are confidential and intended 
solely for the use of the individual or entity to whom they are 
addressed. Please note that any disclosure, copying or distribution of the 
content of this information is strictly forbidden. If you have received 
this email message in error, please destroy it immediately and notify its 
sender.*


Re: Help with Couchbase connector error

2015-11-26 Thread Shixiong Zhu
Het Eyal, I just checked the couchbase spark connector jar. The target
version of some of classes are Java 8 (52.0). You can create a ticket in
https://issues.couchbase.com/projects/SPARKC

Best Regards,
Shixiong Zhu

2015-11-26 9:03 GMT-08:00 Ted Yu :

> StoreMode is from Couchbase connector.
>
> Where did you obtain the connector ?
>
> See also
> http://stackoverflow.com/questions/1096148/how-to-check-the-jdk-version-used-to-compile-a-class-file
>
> On Thu, Nov 26, 2015 at 8:55 AM, Eyal Sharon  wrote:
>
>> Hi ,
>> Great , that gave some directions. But can you elaborate more?  or share
>> some post
>> I am currently running JDK 7 , and  my Couchbase too
>>
>> Thanks !
>>
>> On Thu, Nov 26, 2015 at 6:02 PM, Ted Yu  wrote:
>>
>>> This implies version mismatch between the JDK used to build your jar and
>>> the one at runtime.
>>>
>>> When building, target JDK 1.7
>>>
>>> There're plenty of posts on the web for dealing with such error.
>>>
>>> Cheers
>>>
>>> On Thu, Nov 26, 2015 at 7:31 AM, Eyal Sharon  wrote:
>>>
 Hi,

 I am trying to set a connection to Couchbase. I am at the very
 beginning, and I got stuck on   this exception

 Exception in thread "main" java.lang.UnsupportedClassVersionError:
 com/couchbase/spark/StoreMode : Unsupported major.minor version 52.0


 Here is the simple code fragment

   val sc = new SparkContext(cfg)

   val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
 "content"))
   val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
 "content", "in", "here"))


   val data = sc
 .parallelize(Seq(doc1, doc2))
 .saveToCouchbase()
 }


 Any help will be a bless


 Thanks!


 *This email and any files transmitted with it are confidential and
 intended solely for the use of the individual or entity to whom they are
 addressed. Please note that any disclosure, copying or distribution of the
 content of this information is strictly forbidden. If you have received
 this email message in error, please destroy it immediately and notify its
 sender.*

>>>
>>>
>>
>> *This email and any files transmitted with it are confidential and
>> intended solely for the use of the individual or entity to whom they are
>> addressed. Please note that any disclosure, copying or distribution of the
>> content of this information is strictly forbidden. If you have received
>> this email message in error, please destroy it immediately and notify its
>> sender.*
>>
>
>


Re: Help with Couchbase connector error

2015-11-26 Thread Ted Yu
This implies version mismatch between the JDK used to build your jar and
the one at runtime.

When building, target JDK 1.7

There're plenty of posts on the web for dealing with such error.

Cheers

On Thu, Nov 26, 2015 at 7:31 AM, Eyal Sharon  wrote:

> Hi,
>
> I am trying to set a connection to Couchbase. I am at the very beginning,
> and I got stuck on   this exception
>
> Exception in thread "main" java.lang.UnsupportedClassVersionError:
> com/couchbase/spark/StoreMode : Unsupported major.minor version 52.0
>
>
> Here is the simple code fragment
>
>   val sc = new SparkContext(cfg)
>
>   val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", 
> "content"))
>   val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", 
> "content", "in", "here"))
>
>
>   val data = sc
> .parallelize(Seq(doc1, doc2))
> .saveToCouchbase()
> }
>
>
> Any help will be a bless
>
>
> Thanks!
>
>
> *This email and any files transmitted with it are confidential and
> intended solely for the use of the individual or entity to whom they are
> addressed. Please note that any disclosure, copying or distribution of the
> content of this information is strictly forbidden. If you have received
> this email message in error, please destroy it immediately and notify its
> sender.*
>


  1   2   >