RE: we control spark file names before we write them - should we opensource it?

2020-06-08 Thread Stefan Panayotov
Yes, I think so.

Stefan Panayotov, PhD
spanayo...@outlook.com
spanayo...@comcast.net
spanayo...@gmail.com

-Original Message-
From: ilaimalka  
Sent: Monday, June 8, 2020 9:17 AM
To: user@spark.apache.org
Subject: we control spark file names before we write them - should we 
opensource it?

Hi, as part of our work we needed more control over the name of the files 
written out by Spark, e.g instead of "part-...csv.gz" we want to get something 
like this "15988891_1748330679_20200507124153.tsv.gz" where the first number is 
hardcoded, the second one is the value from partitionBy and third is a 
timestamp in provided SimpleDateFormat.

After a long research for possibilities, the most common way is to find those 
files and rename them *after* the spark job has finished. We tried to find a 
more efficient way.

We decided to implement a new DataSource which is actually a wrapper to most 
standard Spark file formats (csv, json, text, parquet, avro), which allows us 
to rename the file before it's written.

In short, this is how it works :
Datasource extends FileFormat and implements prepareWrite - which redirects to 
local FileNameOutputWriterFactory TypeFactory which redirects to original Spark 
Formats FileNameOutputWriterFactory which actually do the work and by 
reflection can call any implementation to control the file name  

The question is - is this interesting/useful enough for the community?
Should we open-source it?
Thanks!

p.s we wrote the same question on spark channel on ASF if you want to discuss 
it there:
https://the-asf.slack.com/archives/CD5UQDNBA/p1589117451069600



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: spark 2 new stuff

2018-02-26 Thread Stefan Panayotov
To me Delta is very valuable.

Stefan Panayotov, PhD
spanayo...@outlook.com<mailto:spanayo...@outlook.com>
spanayo...@comcast.net<mailto:spanayo...@comcast.net>
Cell: 610-517-5586

From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com]
Sent: Monday, February 26, 2018 9:26 AM
To: user @spark <user@spark.apache.org>
Subject: spark 2 new stuff

just a quick query.

From a practitioner's point of view what new stuff of Spark 2 have been most 
value for money.

I hear different and often conflicting stuff.Hhowever, I was wondering if the 
user group has more practical takes.


regards,




Dr Mich Talebzadeh



LinkedIn  
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fprofile%2Fview%3Fid%3DAAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw=02%7C01%7C%7Cbe15ce79dfd24ec9474b08d57d24e38d%7C84df9e7fe9f640afb435%7C1%7C0%7C636552519739938238=XzI6bQafyUXSoDzgQshiwv4plb1Je2Jcnkk2eL1eB1k%3D=0>



http://talebzadehmich.wordpress.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Ftalebzadehmich.wordpress.com=02%7C01%7C%7Cbe15ce79dfd24ec9474b08d57d24e38d%7C84df9e7fe9f640afb435%7C1%7C0%7C636552519739938238=yINrtnakcXX%2F%2BvzhSJp%2FsFfYIrsHn2QUdhaBDxz4uxw%3D=0>



Disclaimer: Use it at your own risk. Any and all responsibility for any loss, 
damage or destruction of data or any other property which may arise from 
relying on this email's technical content is explicitly disclaimed. The author 
will in no case be liable for any monetary damages arising from such loss, 
damage or destruction.




RE: How to output RDD to one file with specific name?

2016-08-25 Thread Stefan Panayotov
You can do something like:

 

dbutils.fs.cp("/foo/part-0","/foo/my-data.csv")

 

Stefan Panayotov, PhD

 <mailto:spanayo...@outlook.com> spanayo...@outlook.com

 <mailto:spanayo...@comcast.net> spanayo...@comcast.net

Cell: 610-517-5586

Home: 610-355-0919

 

From: Gavin Yue [mailto:yue.yuany...@gmail.com] 
Sent: Thursday, August 25, 2016 1:15 PM
To: user <user@spark.apache.org>
Subject: How to output RDD to one file with specific name?

 

I am trying to output RDD to disk by

 

rdd.coleasce(1).saveAsTextFile("/foo")

 

It outputs to foo folder with a file with name: Part-0. 

 

Is there a way I could directly save the file as /foo/somename ?

 

Thanks. 

 



adding rows to a DataFrame

2016-03-11 Thread Stefan Panayotov
Hi,
 
I have a problem that requires me to go through the rows in a DataFrame (or 
possibly through rows in a JSON file) and conditionally add rows depending on a 
value in one of the columns in each existing row. So, for example if I have:
 
+---+---+---+

| _1| _2| _3|


+---+---+---+


|ID1|100|1.1|


|ID2|200|2.2|


|ID3|300|3.3|


|ID4|400|4.4|


+---+---+---+



I need to be able to get:
 
+---+---+---++---+

| _1| _2|
_3| 
_4| _5|


+---+---+---++---+


|ID1|100|1.1|ID1 add text or d...| 25|


|id11 ..|21 |


|id12 ..|22 |


|ID2|200|2.2|ID2 add text or d...| 50|


|id21 ..|33 |


|id22 ..|34 |


|id23 ..|35 |


|ID3|300|3.3|ID3 add text or d...| 75|


|id31 ..|11 |


|ID4|400|4.4|ID4 add text or d...|100|


|id41 ..|51 |


|id42 ..|52 |


|id43 ..|53 |


|id44 ..|54 |


+---+---+---++---+



How can I achieve this in Spark without doing DF.collect(), which will get 
everything to the driver and for a big data set I'll get OOM?
BTW, I know how to use withColumn() to add new columns to the DataFrame. I need 
to also add new rows.
Any help will be appreciated.

Thanks,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

RE: Spark 1.5.2 memory error

2016-02-03 Thread Stefan Panayotov
I drastically increased the memory:
 
spark.executor.memory = 50g
spark.driver.memory = 8g
spark.driver.maxResultSize = 8g
spark.yarn.executor.memoryOverhead = 768
 
I still see executors killed, but this time the memory does not seem to be the 
issue.
The error on the Jupyter notebook is:
 
Py4JJavaError: An error occurred while calling 
z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Exception 
while getting task result: java.io.IOException: Failed to connect to 
/10.0.0.9:48755 
>From nodemanagers log corresponding to worker 10.0.0.9:
 



2016-02-03 17:31:44,917 INFO  yarn.YarnShuffleService
(YarnShuffleService.java:initializeApplication(129)) - Initializing application
application_1454509557526_0014


2016-02-03 17:31:44,918 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93
transitioned from LOCALIZING to LOCALIZED


2016-02-03 17:31:44,947 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from
LOCALIZED to RUNNING


2016-02-03 17:31:44,951 INFO 
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:buildCommandExecutor(267)) - launchContainer:
[bash,
/mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh]


2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for
container_1454509557526_0014_01_93


2016-02-03 17:31:45,686 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
container_1454509557526_0014_01_11


 


Then I can see the memory usage
increasing from 230.6 MB to 12.6 GB, which is far below 50g, and the suddenly 
getting killed!?!


 


2016-02-03 17:33:17,350 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962 for
container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB physical
memory used; 52.8 GB of 107.1 GB virtual memory used


2016-02-03 17:33:17,613 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container 
container_1454509557526_0014_01_93 transitioned from
RUNNING to KILLING


2016-02-03 17:33:17,613 INFO  launcher.ContainerLaunch
(ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container
container_1454509557526_0014_01_93


2016-02-03 17:33:17,629 WARN 
nodemanager.DefaultContainerExecutor
(DefaultContainerExecutor.java:launchContainer(223)) - Exit code from container
container_1454509557526_0014_01_93 is : 143


2016-02-03 17:33:17,667 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container
container_1454509557526_0014_01_93 transitioned from KILLING to
CONTAINER_CLEANEDUP_AFTER_KILL


2016-02-03 17:33:17,669 INFO  nodemanager.NMAuditLogger
(NMAuditLogger.java:logSuccess(89)) -
USER=root   OPERATION=Container Finished -
KilledTARGET=ContainerImpl
RESULT=SUCCESS  
APPID=application_1454509557526_0014
CONTAINERID=container_1454509557526_0014_01_93


2016-02-03 17:33:17,670 INFO  container.ContainerImpl
(ContainerImpl.java:handle(1131)) - Container
container_1454509557526_0014_01_93 transitioned from
CONTAINER_CLEANEDUP_AFTER_KILL to DONE


2016-02-03 17:33:17,670 INFO  application.ApplicationImpl
(ApplicationImpl.java:transition(347)) - Removing
container_1454509557526_0014_01_93 from application
application_1454509557526_0014


2016-02-03 17:33:17,671 INFO 
logaggregation.AppLogAggregatorImpl
(AppLogAggregatorImpl.java:startContainerLogAggregation(546)) - Considering
container container_1454509557526_0014_01_93 for log-aggregation


2016-02-03 17:33:17,671 INFO  containermanager.AuxServices
(AuxServices.java:handle(196)) - Got event CONTAINER_STOP for appId
application_1454509557526_0014


2016-02-03 17:33:17,671 INFO  yarn.YarnShuffleService
(YarnShuffleService.java:stopContainer(161)) - Stopping container
container_1454509557526_0014_01_93


2016-02-03 17:33:20,351 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for
container_1454509557526_0014_01_93


2016-02-03 17:33:20,383 INFO  monitor.ContainersMonitorImpl
(ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 28727 for
container-id container_1454509557526_0012_01_01: 319.8 MB of 1.5 GB
physical memory used; 1.7 GB of 3.1 GB virtual memory used

2016-02-03
17:33:22,627 INFO  nodemanager.NodeStatusUpdaterImpl
(NodeStatusUpdaterImpl.java:removeOrTrackCompletedContainersFromContext(529)) -
Removed completed containers from NM context: 
[container_1454509557526_0014_01_93]
 
I'll appreciate any suggestions.

Thanks,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.

Spark 1.5.2 memory error

2016-02-02 Thread Stefan Panayotov
Hi Guys,
 
I need help with Spark memory errors when executing ML pipelines.
The error that I see is:
 



16/02/02 20:34:17 INFO Executor: Executor is trying to kill task
32.0 in stage 32.0 (TID 3298)


16/02/02 20:34:17 INFO Executor: Executor is trying to kill task
12.0 in stage 32.0 (TID 3278)


16/02/02 20:34:39 INFO MemoryStore: ensureFreeSpace(2004728720)
called with curMem=296303415, maxMem=8890959790


16/02/02 20:34:39 INFO MemoryStore: Block taskresult_3298 stored
as bytes in memory (estimated size 1911.9 MB, free 6.1 GB)


16/02/02 20:34:39 ERROR CoarseGrainedExecutorBackend: RECEIVED
SIGNAL 15: SIGTERM


16/02/02 20:34:39 ERROR Executor: Exception in task 12.0 in
stage 32.0 (TID 3278)


java.lang.OutOfMemoryError:
Java heap space


   at
java.util.Arrays.copyOf(Arrays.java:2271)


   at
java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)


   at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)


   at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)


   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


   at
java.lang.Thread.run(Thread.java:745)


16/02/02 20:34:39 INFO DiskBlockManager: Shutdown hook called


16/02/02 20:34:39 INFO Executor: Finished task 32.0 in stage
32.0 (TID 3298). 2004728720 bytes result sent via BlockManager)


16/02/02 20:34:39 ERROR SparkUncaughtExceptionHandler: Uncaught
exception in thread Thread[Executor task launch worker-8,5,main]


java.lang.OutOfMemoryError:
Java heap space


   at
java.util.Arrays.copyOf(Arrays.java:2271)


   at
java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)


   at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)


   at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)


   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)


   at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)


   at
java.lang.Thread.run(Thread.java:745)


16/02/02 20:34:39 INFO ShutdownHookManager: Shutdown hook called


16/02/02 20:34:39 INFO MetricsSystemImpl: Stopping
azure-file-system metrics system...


16/02/02 20:34:39 INFO MetricsSinkAdapter: azurefs2 thread
interrupted.


16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system
metrics system stopped.


16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system
metrics system shutdown complete.


 


And …..


 


16/02/02 20:09:03 INFO impl.ContainerManagementProtocolProxy:
Opening proxy : 10.0.0.5:30050


16/02/02 20:33:51 INFO yarn.YarnAllocator: Completed container
container_1454421662639_0011_01_05 (state: COMPLETE, exit status: -104)


16/02/02 20:33:51 WARN yarn.YarnAllocator: Container killed by YARN for
exceeding memory limits. 16.8 GB of 16.5 GB physical memory used. Consider
boosting spark.yarn.executor.memoryOverhead.


16/02/02 20:33:56 INFO yarn.YarnAllocator: Will request 1
executor containers, each with 2 cores and 16768 MB memory including 384 MB
overhead


16/02/02 20:33:56 INFO yarn.YarnAllocator: Container request
(host: Any, capability: <memory:16768, vCores:2>)


16/02/02 20:33:57 INFO yarn.YarnAllocator: Launching container
container_1454421662639_0011_01_37 for on host 10.0.0.8


16/02/02 20:33:57 INFO yarn.YarnAllocator: Launching
ExecutorRunnable. driverUrl: 
akka.tcp://sparkDriver@10.0.0.15:47446/user/CoarseGrainedScheduler, 
executorHostname: 10.0.0.8


16/02/02 20:33:57 INFO yarn.YarnAllocator: Received 1 containers
from YARN, launching executors on 1 of them.


I'll really appreciate any help here.
 
Thank you,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

RE: Spark 1.5.2 memory error

2016-02-02 Thread Stefan Panayotov
For the memoryOvethead I have the default of 10% of 16g, and Spark version is 
1.5.2.

Stefan Panayotov, PhD
Sent from Outlook Mail for Windows 10 phone


From: Ted Yu
Sent: Tuesday, February 2, 2016 4:52 PM
To: Jakob Odersky
Cc: Stefan Panayotov; user@spark.apache.org
Subject: Re: Spark 1.5.2 memory error

What value do you use for spark.yarn.executor.memoryOverhead ?

Please see https://spark.apache.org/docs/latest/running-on-yarn.html for 
description of the parameter.

Which Spark release are you using ?

Cheers

On Tue, Feb 2, 2016 at 1:38 PM, Jakob Odersky <ja...@odersky.com> wrote:
Can you share some code that produces the error? It is probably not
due to spark but rather the way data is handled in the user code.
Does your code call any reduceByKey actions? These are often a source
for OOM errors.

On Tue, Feb 2, 2016 at 1:22 PM, Stefan Panayotov <spanayo...@msn.com> wrote:
> Hi Guys,
>
> I need help with Spark memory errors when executing ML pipelines.
> The error that I see is:
>
>
> 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 32.0 in
> stage 32.0 (TID 3298)
>
>
> 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 12.0 in
> stage 32.0 (TID 3278)
>
>
> 16/02/02 20:34:39 INFO MemoryStore: ensureFreeSpace(2004728720) called with
> curMem=296303415, maxMem=8890959790
>
>
> 16/02/02 20:34:39 INFO MemoryStore: Block taskresult_3298 stored as bytes in
> memory (estimated size 1911.9 MB, free 6.1 GB)
>
>
> 16/02/02 20:34:39 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15:
> SIGTERM
>
>
> 16/02/02 20:34:39 ERROR Executor: Exception in task 12.0 in stage 32.0 (TID
> 3278)
>
>
> java.lang.OutOfMemoryError: Java heap space
>
>
>        at java.util.Arrays.copyOf(Arrays.java:2271)
>
>
>        at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
>
>
>        at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)
>
>
>        at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)
>
>
>        at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>
>        at java.lang.Thread.run(Thread.java:745)
>
>
> 16/02/02 20:34:39 INFO DiskBlockManager: Shutdown hook called
>
>
> 16/02/02 20:34:39 INFO Executor: Finished task 32.0 in stage 32.0 (TID
> 3298). 2004728720 bytes result sent via BlockManager)
>
>
> 16/02/02 20:34:39 ERROR SparkUncaughtExceptionHandler: Uncaught exception in
> thread Thread[Executor task launch worker-8,5,main]
>
>
> java.lang.OutOfMemoryError: Java heap space
>
>
>        at java.util.Arrays.copyOf(Arrays.java:2271)
>
>
>        at
> java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191)
>
>
>        at
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86)
>
>
>        at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256)
>
>
>        at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>
>
>        at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>
>
>        at java.lang.Thread.run(Thread.java:745)
>
>
> 16/02/02 20:34:39 INFO ShutdownHookManager: Shutdown hook called
>
>
> 16/02/02 20:34:39 INFO MetricsSystemImpl: Stopping azure-file-system metrics
> system...
>
>
> 16/02/02 20:34:39 INFO MetricsSinkAdapter: azurefs2 thread interrupted.
>
>
> 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics system
> stopped.
>
>
> 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics system
> shutdown complete.
>
>
>
>
>
> And …..
>
>
>
>
>
> 16/02/02 20:09:03 INFO impl.ContainerManagementProtocolProxy: Opening proxy
> : 10.0.0.5:30050
>
>
> 16/02/02 20:33:51 INFO yarn.YarnAllocator: Completed container
> container_1454421662639_0011_01_05 (state: COMPLETE, exit status: -104)
>
>
> 16/02/02 20:33:51 WARN yarn.YarnAllocator: Container killed by YARN for
> exceeding memory limits. 16.8 GB of 16.5 GB physical memory used. Consider
> boosting spark.yarn.executor.memoryOverhead.
>
>
> 16/02/02 20:33:56 INFO yarn.YarnAllocator: Will request 1 executor
> containers, each with 2 cores and 16768 MB memory including 384 MB overhead
>
>
> 16/02/02 20:33:56 INFO yarn.YarnAllocator: Container request (host: Any,
> capability: <memory:16768, vCores:2>)
>
>
> 16/02/02 20:33:57 INFO yarn.YarnAllocator: Launching container
> container_1454

RE: Python UDFs

2016-01-28 Thread Stefan Panayotov
Thanks, Jacob.
 
But it seems that Python requires the RETURN Type to be specified.
And DenseVector is not a valid return type, or I do not know the correct type 
to put in.
Shall I try ArrayType?
Any ideas?



Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net

 
> Date: Wed, 27 Jan 2016 15:03:06 -0800
> Subject: Re: Python UDFs
> From: ja...@odersky.com
> To: spanayo...@msn.com
> CC: user@spark.apache.org
> 
> Have you checked:
> 
> - the mllib doc for python
> https://spark.apache.org/docs/1.6.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.DenseVector
> - the udf doc 
> https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.functions.udf
> 
> You should be fine in returning a DenseVector as the return type of
> the udf, as it provides access to a schema.
> 
> These are just directions to explore, I haven't used PySpark myself.
> 
> On Wed, Jan 27, 2016 at 10:38 AM, Stefan Panayotov <spanayo...@msn.com> wrote:
> > Hi,
> >
> > I have defined a UDF in Scala like this:
> >
> > import org.apache.spark.mllib.linalg.Vector
> > import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary,
> > Statistics}
> > import org.apache.spark.mllib.linalg.DenseVector
> >
> > val determineVector = udf((a: Double, b: Double) => {
> > val data: Array[Double] = Array(a,b)
> > val dv = new DenseVector(data)
> > dv
> >   })
> >
> > How can I write the corresponding function in Pyhton/Pyspark?
> >
> > Thanks for your help
> >
> > Stefan Panayotov, PhD
> > Home: 610-355-0919
> > Cell: 610-517-5586
> > email: spanayo...@msn.com
> > spanayo...@outlook.com
> > spanayo...@comcast.net
> >
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
  

Python UDFs

2016-01-27 Thread Stefan Panayotov
Hi,
 
I have defined a UDF in Scala like this:
 
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
import org.apache.spark.mllib.linalg.DenseVector
 
val determineVector = udf((a: Double, b: Double) => {
val data: Array[Double] = Array(a,b)
val dv = new DenseVector(data)
dv
  })

How can I write the corresponding function in Pyhton/Pyspark?
 
Thanks for your help


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

RE: Spark SQL running totals

2015-10-16 Thread Stefan Panayotov
Thanks Deenar.
This works perfectly.
I can't test the solution with window functions because I am still on Spark 
1.3.1
Hopefully will move to 1.5 soon.

Stefan Panayotov
Sent from my Windows Phone

From: Deenar Toraskar<mailto:deenar.toras...@gmail.com>
Sent: ‎10/‎15/‎2015 2:35 PM
To: Stefan Panayotov<mailto:spanayo...@msn.com>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Spark SQL running totals

you can do a self join of the table with itself with the join clause being
a.col1 >= b.col1

select a.col1, a.col2, sum(b.col2)
from tablea as a left outer join tablea as b on (a.col1 >= b.col1)
group by a.col1, a.col2

I havent tried it, but cant see why it cant work, but doing it in RDD might
be more efficient see
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/

On 15 October 2015 at 18:48, Stefan Panayotov <spanayo...@msn.com> wrote:

> Hi,
>
> I need help with Spark SQL. I need to achieve something like the following.
> If I have data like:
>
> col_1  col_2
> 1 10
> 2 30
> 3 15
> 4 20
> 5 25
>
> I need to get col_3 to be the running total of the sum of the previous
> rows of col_2, e.g.
>
> col_1  col_2  col_3
> 1 1010
> 2 3040
> 3 1555
> 4 2075
> 5 25100
>
> Is there a way to achieve this in Spark SQL or maybe with Data frame
> transformations?
>
> Thanks in advance,
>
>
> *Stefan Panayotov, PhD **Home*: 610-355-0919
> *Cell*: 610-517-5586
> *email*: spanayo...@msn.com
> spanayo...@outlook.com
> spanayo...@comcast.net
>
>


RE: Spark SQL running totals

2015-10-15 Thread Stefan Panayotov
Thanks to all of you guys for the helpful suggestions. I'll try these first 
thing tomorrow morning.

Stefan Panayotov
Sent from my Windows Phone

From: java8964<mailto:java8...@hotmail.com>
Sent: ‎10/‎15/‎2015 4:30 PM
To: Michael Armbrust<mailto:mich...@databricks.com>; Deenar 
Toraskar<mailto:deenar.toras...@gmail.com>
Cc: Stefan Panayotov<mailto:spanayo...@msn.com>; 
user@spark.apache.org<mailto:user@spark.apache.org>
Subject: RE: Spark SQL running totals

My mistake. I didn't noticed "UNBOUNDED PRECEDING" already supported.
So cumulative sum should work then.
Thanks
Yong

From: java8...@hotmail.com
To: mich...@databricks.com; deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org
Subject: RE: Spark SQL running totals
Date: Thu, 15 Oct 2015 16:24:39 -0400




Not sure the windows function can work for his case.
If you do a "sum() over (partitioned by)", that will return a total sum per 
partition, instead of a cumulative sum wanted in this case.
I saw there is a "cume_dis", but no "cume_sum".
Do we really have a "cume_sum" in Spark window function, or am I total 
misunderstand about "sum() over (partitioned by)" in it?
Yong

From: mich...@databricks.com
Date: Thu, 15 Oct 2015 11:51:59 -0700
Subject: Re: Spark SQL running totals
To: deenar.toras...@gmail.com
CC: spanayo...@msn.com; user@spark.apache.org

Check out: 
https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html
On Thu, Oct 15, 2015 at 11:35 AM, Deenar Toraskar <deenar.toras...@gmail.com> 
wrote:
you can do a self join of the table with itself with the join clause being 
a.col1 >= b.col1
select a.col1, a.col2, sum(b.col2)from tablea as a left outer join tablea as b 
on (a.col1 >= b.col1)group by a.col1, a.col2
I havent tried it, but cant see why it cant work, but doing it in RDD might be 
more efficient see 
https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/
On 15 October 2015 at 18:48, Stefan Panayotov <spanayo...@msn.com> wrote:



Hi,

I need help with Spark SQL. I need to achieve something like the following.
If I have data like:

col_1  col_2
1 10
2 30
3 15
4 20
5 25

I need to get col_3 to be the running total of the sum of the previous rows of 
col_2, e.g.

col_1  col_2  col_3
1 1010
2 3040
3 1555
4 2075
5 25100

Is there a way to achieve this in Spark SQL or maybe with Data frame 
transformations?

Thanks in advance,


Stefan Panayotov, PhD
Home: 610-355-0919
Cell: 610-517-5586
email: spanayo...@msn.com
spanayo...@outlook.com
spanayo...@comcast.net







Spark SQL running totals

2015-10-15 Thread Stefan Panayotov
Hi,
 
I need help with Spark SQL. I need to achieve something like the following.
If I have data like:
 
col_1  col_2
1 10
2 30
3 15
4 20
5 25
 
I need to get col_3 to be the running total of the sum of the previous rows of 
col_2, e.g.
 
col_1  col_2  col_3
1 1010
2 3040
3 1555
4 2075
5 25100
 
Is there a way to achieve this in Spark SQL or maybe with Data frame 
transformations?
 
Thanks in advance,


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

HiveContext error

2015-08-05 Thread Stefan Panayotov
Hello,
 
I am trying to define an external Hive table from Spark HiveContext like the 
following:
 
import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
 
hiveCtx.sql(sCREATE EXTERNAL TABLE IF NOT EXISTS Rentrak_Ratings (Version 
string, Gen_Date string, Market_Number string, Market_Name string, Time_Zone 
string, Number_Households string,
 | DateTime string, Program_Start_Time string, Program_End_Time string, Station 
string, Station_Name string, Call_Sign string, Network_Name string, Program 
string,
 | Series_Name string, Series_Number string, Episode_Number string, 
Episode_Title string, Demographic string, Demographic_Name string, HHUniverse 
string,
 | Share_15min_Segment string, PHUT_15min_Segment string, Rating_15min_Segment 
string, AV_Audience_15min_Segment string)
 | PARTITIONED BY (year INT, month INT)
 | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'.stripMargin)

And I am getting the following error:
 
org.apache.spark.sql.execution.QueryExecutionException: FAILED: Hive Internal 
Error: java.lang.ClassNotFoundException(org.apache.hadoop.hive.ql.hooks.ATSHook)

at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:324)
at 
org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:292)

at 
org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54)

at 
org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54)

at 
org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099)

at 
org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099)

at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147)

at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130)

at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51)

at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:103)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:37)

at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:39)

at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41)

at $iwC$$iwC$$iwC$$iwC.init(console:43)

at $iwC$$iwC$$iwC.init(console:45)

at $iwC$$iwC.init(console:47)

at $iwC.init(console:49)

at init(console:51)

at .init(console:55)

at .clinit(console)

at .init(console:7)

at .clinit(console)

 
Can anybody help please?


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

FW: Executing spark code in Zeppelin

2015-07-29 Thread Stefan Panayotov


Stefan Panayotov
Sent from my Windows Phone

From: Stefan Panayotovmailto:spanayo...@msn.com
Sent: ‎7/‎29/‎2015 8:20 AM
To: user-subscr...@spark.apache.orgmailto:user-subscr...@spark.apache.org
Subject: Executing spark code in Zeppelin

Hi,
I faced a problem with running long code snippets in Zeppelin paragraph. If the 
code passes certain limit (I still have to check exactly the limit) clicking on 
the run button, or pressing Shift-Enter does nothing. This effect can be 
demonstrated even with adding comments to the code.
Has anybody stumbled on such a problem?

Stefan Panayotov
Sent from my Windows Phone


RE: Executing spark code in Zeppelin

2015-07-29 Thread Stefan Panayotov
Hi Silvio,

There are no errors reported. Just the Zeppelin paragraph refuses to do 
anything.
I tested this by just adding comment characters to the code snippet.
Also, I discovered the limit is 3400. Once I go over that it stops reacting.
 
BTW, thanks for the advice - I sent email to the Zeppelin community.


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net

 
From: silvio.fior...@granturing.com
To: spanayo...@msn.com; user@spark.apache.org
Subject: Re: Executing spark code in Zeppelin
Date: Wed, 29 Jul 2015 12:50:51 +








Hi Stefan,



Might be best to move this over to the Zeppelin user list at 
us...@zeppelin.incubator.apache.org



For now, do you see any errors in the logs? Could be a syntax or compiler error 
that isn’t getting reported back to the UI.



Thanks,
Silvio










From: Stefan Panayotov

Date: Wednesday, July 29, 2015 at 8:21 AM

To: user@spark.apache.org

Subject: FW: Executing spark code in Zeppelin













Stefan Panayotov

Sent from my Windows Phone



From:
Stefan Panayotov

Sent:
‎7/‎29/‎2015 8:20 AM

To:
user-subscr...@spark.apache.org

Subject:
Executing spark code in Zeppelin







Hi,

I faced a problem with running long code snippets in Zeppelin paragraph. If the 
code passes certain limit (I still have to check exactly the limit) clicking on 
the run button, or pressing Shift-Enter does nothing. This effect can be 
demonstrated even with adding
 comments to the code.

Has anybody stumbled on such a problem?



Stefan Panayotov

Sent from my Windows Phone




  

Zeppelin notebook question

2015-07-23 Thread Stefan Panayotov
Hi,

 

When I create a DataFrame through Spark SQLContext and then register temp
table I can use %sql Zeppelin interpreter to open a nice SQL paragraph.

If on the other hand I do the same through HiveContext, I can't see those
tables in the %sql show tables.

Is there a way to query the HiveContext tables in %sql?

 

Thanks,

 

Stefan Panayotov, Ph.D.

email: spanayo...@msn.com

home: 610-355-0919

cell: 610-517-5586

 



RE: Add column to DF

2015-07-21 Thread Stefan Panayotov
This is working!
Thank you so much :)

Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net

 
From: mich...@databricks.com
Date: Tue, 21 Jul 2015 12:08:04 -0700
Subject: Re: Add column to DF
To: spanayo...@msn.com
CC: user@spark.apache.org

Try instead:
import org.apache.spark.sql.functions._
val determineDayPartID = udf((evntStDate: String, evntStHour: String) = {
val stFormat = new java.text.SimpleDateFormat(yyMMdd)
var stDateStr:String = evntStDate.substring(2,8)
val stDate:Date = stFormat.parse(stDateStr)
val stHour = evntStHour.substring(1,3).toDouble + 0.1
var bucket = Math.ceil(stHour/3.0).toInt
val cal:Calendar = Calendar.getInstance
cal.setTime(stDate)
var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK)
if (dayOfWeek == 1) dayOfWeek = 8
if (dayOfWeek  6) bucket = bucket + 8
   bucket

  })

input.withColumn(DayPartID, determineDayPartID (col(StartDate), 
col(EventStartHour)))   

Add column to DF

2015-07-21 Thread Stefan Panayotov
Hi,
 
I am trying to ad a column to a data frame that I created based on a JSON file 
like this:
 
val input =
hiveCtx.jsonFile(wasb://n...@cmwhdinsightdatastore.blob.core.windows.net/json/*).toDF().persist(StorageLevel.MEMORY_AND_DISK)
 
I have a function that is generating the values for the new column:
 
def determineDayPartID(evntStDate: String, evntStHour: String) : Int  = {


val stFormat = new
java.text.SimpleDateFormat(yyMMdd)


var stDateStr:String = evntStDate.substring(2,8)


val stDate:Date = stFormat.parse(stDateStr)


val stHour = evntStHour.substring(1,3).toDouble + 0.1


var bucket = Math.ceil(stHour/3.0).toInt


val cal:Calendar = Calendar.getInstance


cal.setTime(stDate)


var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK)


if (dayOfWeek == 1) dayOfWeek = 8


if (dayOfWeek  6) bucket = bucket + 8


return bucket


  }


When I try:
 
input.withColumn(DayPartID, callUDF(determineDayPartID, IntegerType, 
col(StartDate), col(EventStartHour)))
 
I am getting the error:
 
missing arguments for
method determineDayPartID in object rating; follow this method with `_' if you
want to treat it as a partially applied function

Can you please help?


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

RE: import errors with Eclipse Scala

2015-07-01 Thread Stefan Panayotov
Thanks, Jem.
 
I added  scala-compiler.jar from 
C:\Eclipse\eclipse\plugins\org.scala-ide.scala210.jars_4.1.0.201505250838\target\jars
And looks like this resolved the issue.
 
Thanks once again.


Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net

 
From: jem.tuc...@gmail.com
Date: Wed, 1 Jul 2015 18:20:51 +
Subject: Re: import errors with Eclipse Scala
To: spanayo...@msn.com; yuzhih...@gmail.com
CC: user@spark.apache.org

in eclipse you can just add the spark assembly jar to the build path, right 
click the project  build path  configure build path  library  add external 
jars

On Wed, Jul 1, 2015 at 7:15 PM Stefan Panayotov spanayo...@msn.com wrote:



Hi Ted,
 
How can I import the relevant Spark projects into Eclipse?
Do I need to add anything the Java Build Path in the project properties?
 
Also, I have installed sbt on my machine.
Is there a corresponding sbt command to the maven command below?
 
Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net


 
Date: Wed, 1 Jul 2015 10:04:23 -0700
Subject: Re: import errors with Eclipse Scala
From: yuzhih...@gmail.com
To: spanayo...@msn.com
CC: user@spark.apache.org

Have you imported the relevant Spark projects into Eclipse.
You can run command similar to the following to generate project files for 
Spark:
mvn clean package -DskipTests eclipse:eclipse

On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote:



Hi Team,
 
Just installed Eclipse with Scala plugin to benefit from IDE environment and I 
faced the problem that any import statement gives me an error.
For example:
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
 
All report errors of type:
 
“object apache is not member of package org” or
“object json4s is not member of package org”
 
How can I resolve this?
 
Thanks,



Stefan Panayotov, PhD 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

  

RE: import errors with Eclipse Scala

2015-07-01 Thread Stefan Panayotov
Hi Ted,
 
How can I import the relevant Spark projects into Eclipse?
Do I need to add anything the Java Build Path in the project properties?
 
Also, I have installed sbt on my machine.
Is there a corresponding sbt command to the maven command below?
 
Stefan Panayotov, PhD 
Home: 610-355-0919 
Cell: 610-517-5586 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net


 
Date: Wed, 1 Jul 2015 10:04:23 -0700
Subject: Re: import errors with Eclipse Scala
From: yuzhih...@gmail.com
To: spanayo...@msn.com
CC: user@spark.apache.org

Have you imported the relevant Spark projects into Eclipse.
You can run command similar to the following to generate project files for 
Spark:
mvn clean package -DskipTests eclipse:eclipse

On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote:



Hi Team,
 
Just installed Eclipse with Scala plugin to benefit from IDE environment and I 
faced the problem that any import statement gives me an error.
For example:
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods
import org.json4s.jackson.JsonMethods._
 
All report errors of type:
 
“object apache is not member of package org” or
“object json4s is not member of package org”
 
How can I resolve this?
 
Thanks,



Stefan Panayotov, PhD 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net
  

  

import errors with Eclipse Scala

2015-07-01 Thread Stefan Panayotov
Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE 
environment and I faced the problem that any import statement gives me an 
error.For example: import org.apache.spark.SparkConfimport 
org.apache.spark.SparkContextimport org.apache.spark.sql.hive.HiveContextimport 
org.apache.spark.sql._import org.json4s._import org.json4s.JsonDSL._import 
org.json4s.jackson.JsonMethodsimport org.json4s.jackson.JsonMethods._ All 
report errors of type: “object apache is not member of package org” or“object 
json4s is not member of package org” How can I resolve this? Thanks,


Stefan Panayotov, PhD 
email: spanayo...@msn.com 
spanayo...@outlook.com 
spanayo...@comcast.net