RE: we control spark file names before we write them - should we opensource it?
Yes, I think so. Stefan Panayotov, PhD spanayo...@outlook.com spanayo...@comcast.net spanayo...@gmail.com -Original Message- From: ilaimalka Sent: Monday, June 8, 2020 9:17 AM To: user@spark.apache.org Subject: we control spark file names before we write them - should we opensource it? Hi, as part of our work we needed more control over the name of the files written out by Spark, e.g instead of "part-...csv.gz" we want to get something like this "15988891_1748330679_20200507124153.tsv.gz" where the first number is hardcoded, the second one is the value from partitionBy and third is a timestamp in provided SimpleDateFormat. After a long research for possibilities, the most common way is to find those files and rename them *after* the spark job has finished. We tried to find a more efficient way. We decided to implement a new DataSource which is actually a wrapper to most standard Spark file formats (csv, json, text, parquet, avro), which allows us to rename the file before it's written. In short, this is how it works : Datasource extends FileFormat and implements prepareWrite - which redirects to local FileNameOutputWriterFactory TypeFactory which redirects to original Spark Formats FileNameOutputWriterFactory which actually do the work and by reflection can call any implementation to control the file name The question is - is this interesting/useful enough for the community? Should we open-source it? Thanks! p.s we wrote the same question on spark channel on ASF if you want to discuss it there: https://the-asf.slack.com/archives/CD5UQDNBA/p1589117451069600 -- Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/ - To unsubscribe e-mail: user-unsubscr...@spark.apache.org - To unsubscribe e-mail: user-unsubscr...@spark.apache.org
RE: spark 2 new stuff
To me Delta is very valuable. Stefan Panayotov, PhD spanayo...@outlook.com<mailto:spanayo...@outlook.com> spanayo...@comcast.net<mailto:spanayo...@comcast.net> Cell: 610-517-5586 From: Mich Talebzadeh [mailto:mich.talebza...@gmail.com] Sent: Monday, February 26, 2018 9:26 AM To: user @spark <user@spark.apache.org> Subject: spark 2 new stuff just a quick query. From a practitioner's point of view what new stuff of Spark 2 have been most value for money. I hear different and often conflicting stuff.Hhowever, I was wondering if the user group has more practical takes. regards, Dr Mich Talebzadeh LinkedIn https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw<https://nam03.safelinks.protection.outlook.com/?url=https%3A%2F%2Fwww.linkedin.com%2Fprofile%2Fview%3Fid%3DAAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw=02%7C01%7C%7Cbe15ce79dfd24ec9474b08d57d24e38d%7C84df9e7fe9f640afb435%7C1%7C0%7C636552519739938238=XzI6bQafyUXSoDzgQshiwv4plb1Je2Jcnkk2eL1eB1k%3D=0> http://talebzadehmich.wordpress.com<https://nam03.safelinks.protection.outlook.com/?url=http%3A%2F%2Ftalebzadehmich.wordpress.com=02%7C01%7C%7Cbe15ce79dfd24ec9474b08d57d24e38d%7C84df9e7fe9f640afb435%7C1%7C0%7C636552519739938238=yINrtnakcXX%2F%2BvzhSJp%2FsFfYIrsHn2QUdhaBDxz4uxw%3D=0> Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.
RE: How to output RDD to one file with specific name?
You can do something like: dbutils.fs.cp("/foo/part-0","/foo/my-data.csv") Stefan Panayotov, PhD <mailto:spanayo...@outlook.com> spanayo...@outlook.com <mailto:spanayo...@comcast.net> spanayo...@comcast.net Cell: 610-517-5586 Home: 610-355-0919 From: Gavin Yue [mailto:yue.yuany...@gmail.com] Sent: Thursday, August 25, 2016 1:15 PM To: user <user@spark.apache.org> Subject: How to output RDD to one file with specific name? I am trying to output RDD to disk by rdd.coleasce(1).saveAsTextFile("/foo") It outputs to foo folder with a file with name: Part-0. Is there a way I could directly save the file as /foo/somename ? Thanks.
adding rows to a DataFrame
Hi, I have a problem that requires me to go through the rows in a DataFrame (or possibly through rows in a JSON file) and conditionally add rows depending on a value in one of the columns in each existing row. So, for example if I have: +---+---+---+ | _1| _2| _3| +---+---+---+ |ID1|100|1.1| |ID2|200|2.2| |ID3|300|3.3| |ID4|400|4.4| +---+---+---+ I need to be able to get: +---+---+---++---+ | _1| _2| _3| _4| _5| +---+---+---++---+ |ID1|100|1.1|ID1 add text or d...| 25| |id11 ..|21 | |id12 ..|22 | |ID2|200|2.2|ID2 add text or d...| 50| |id21 ..|33 | |id22 ..|34 | |id23 ..|35 | |ID3|300|3.3|ID3 add text or d...| 75| |id31 ..|11 | |ID4|400|4.4|ID4 add text or d...|100| |id41 ..|51 | |id42 ..|52 | |id43 ..|53 | |id44 ..|54 | +---+---+---++---+ How can I achieve this in Spark without doing DF.collect(), which will get everything to the driver and for a big data set I'll get OOM? BTW, I know how to use withColumn() to add new columns to the DataFrame. I need to also add new rows. Any help will be appreciated. Thanks, Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
RE: Spark 1.5.2 memory error
I drastically increased the memory: spark.executor.memory = 50g spark.driver.memory = 8g spark.driver.maxResultSize = 8g spark.yarn.executor.memoryOverhead = 768 I still see executors killed, but this time the memory does not seem to be the issue. The error on the Jupyter notebook is: Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: java.io.IOException: Failed to connect to /10.0.0.9:48755 >From nodemanagers log corresponding to worker 10.0.0.9: 2016-02-03 17:31:44,917 INFO yarn.YarnShuffleService (YarnShuffleService.java:initializeApplication(129)) - Initializing application application_1454509557526_0014 2016-02-03 17:31:44,918 INFO container.ContainerImpl (ContainerImpl.java:handle(1131)) - Container container_1454509557526_0014_01_93 transitioned from LOCALIZING to LOCALIZED 2016-02-03 17:31:44,947 INFO container.ContainerImpl (ContainerImpl.java:handle(1131)) - Container container_1454509557526_0014_01_93 transitioned from LOCALIZED to RUNNING 2016-02-03 17:31:44,951 INFO nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:buildCommandExecutor(267)) - launchContainer: [bash, /mnt/resource/hadoop/yarn/local/usercache/root/appcache/application_1454509557526_0014/container_1454509557526_0014_01_93/default_container_executor.sh] 2016-02-03 17:31:45,686 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(371)) - Starting resource-monitoring for container_1454509557526_0014_01_93 2016-02-03 17:31:45,686 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for container_1454509557526_0014_01_11 Then I can see the memory usage increasing from 230.6 MB to 12.6 GB, which is far below 50g, and the suddenly getting killed!?! 2016-02-03 17:33:17,350 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 30962 for container-id container_1454509557526_0014_01_93: 12.6 GB of 51 GB physical memory used; 52.8 GB of 107.1 GB virtual memory used 2016-02-03 17:33:17,613 INFO container.ContainerImpl (ContainerImpl.java:handle(1131)) - Container container_1454509557526_0014_01_93 transitioned from RUNNING to KILLING 2016-02-03 17:33:17,613 INFO launcher.ContainerLaunch (ContainerLaunch.java:cleanupContainer(370)) - Cleaning up container container_1454509557526_0014_01_93 2016-02-03 17:33:17,629 WARN nodemanager.DefaultContainerExecutor (DefaultContainerExecutor.java:launchContainer(223)) - Exit code from container container_1454509557526_0014_01_93 is : 143 2016-02-03 17:33:17,667 INFO container.ContainerImpl (ContainerImpl.java:handle(1131)) - Container container_1454509557526_0014_01_93 transitioned from KILLING to CONTAINER_CLEANEDUP_AFTER_KILL 2016-02-03 17:33:17,669 INFO nodemanager.NMAuditLogger (NMAuditLogger.java:logSuccess(89)) - USER=root OPERATION=Container Finished - KilledTARGET=ContainerImpl RESULT=SUCCESS APPID=application_1454509557526_0014 CONTAINERID=container_1454509557526_0014_01_93 2016-02-03 17:33:17,670 INFO container.ContainerImpl (ContainerImpl.java:handle(1131)) - Container container_1454509557526_0014_01_93 transitioned from CONTAINER_CLEANEDUP_AFTER_KILL to DONE 2016-02-03 17:33:17,670 INFO application.ApplicationImpl (ApplicationImpl.java:transition(347)) - Removing container_1454509557526_0014_01_93 from application application_1454509557526_0014 2016-02-03 17:33:17,671 INFO logaggregation.AppLogAggregatorImpl (AppLogAggregatorImpl.java:startContainerLogAggregation(546)) - Considering container container_1454509557526_0014_01_93 for log-aggregation 2016-02-03 17:33:17,671 INFO containermanager.AuxServices (AuxServices.java:handle(196)) - Got event CONTAINER_STOP for appId application_1454509557526_0014 2016-02-03 17:33:17,671 INFO yarn.YarnShuffleService (YarnShuffleService.java:stopContainer(161)) - Stopping container container_1454509557526_0014_01_93 2016-02-03 17:33:20,351 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(385)) - Stopping resource-monitoring for container_1454509557526_0014_01_93 2016-02-03 17:33:20,383 INFO monitor.ContainersMonitorImpl (ContainersMonitorImpl.java:run(458)) - Memory usage of ProcessTree 28727 for container-id container_1454509557526_0012_01_01: 319.8 MB of 1.5 GB physical memory used; 1.7 GB of 3.1 GB virtual memory used 2016-02-03 17:33:22,627 INFO nodemanager.NodeStatusUpdaterImpl (NodeStatusUpdaterImpl.java:removeOrTrackCompletedContainersFromContext(529)) - Removed completed containers from NM context: [container_1454509557526_0014_01_93] I'll appreciate any suggestions. Thanks, Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.
Spark 1.5.2 memory error
Hi Guys, I need help with Spark memory errors when executing ML pipelines. The error that I see is: 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 32.0 in stage 32.0 (TID 3298) 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 12.0 in stage 32.0 (TID 3278) 16/02/02 20:34:39 INFO MemoryStore: ensureFreeSpace(2004728720) called with curMem=296303415, maxMem=8890959790 16/02/02 20:34:39 INFO MemoryStore: Block taskresult_3298 stored as bytes in memory (estimated size 1911.9 MB, free 6.1 GB) 16/02/02 20:34:39 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: SIGTERM 16/02/02 20:34:39 ERROR Executor: Exception in task 12.0 in stage 32.0 (TID 3278) java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/02/02 20:34:39 INFO DiskBlockManager: Shutdown hook called 16/02/02 20:34:39 INFO Executor: Finished task 32.0 in stage 32.0 (TID 3298). 2004728720 bytes result sent via BlockManager) 16/02/02 20:34:39 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker-8,5,main] java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:2271) at java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/02/02 20:34:39 INFO ShutdownHookManager: Shutdown hook called 16/02/02 20:34:39 INFO MetricsSystemImpl: Stopping azure-file-system metrics system... 16/02/02 20:34:39 INFO MetricsSinkAdapter: azurefs2 thread interrupted. 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics system stopped. 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics system shutdown complete. And ….. 16/02/02 20:09:03 INFO impl.ContainerManagementProtocolProxy: Opening proxy : 10.0.0.5:30050 16/02/02 20:33:51 INFO yarn.YarnAllocator: Completed container container_1454421662639_0011_01_05 (state: COMPLETE, exit status: -104) 16/02/02 20:33:51 WARN yarn.YarnAllocator: Container killed by YARN for exceeding memory limits. 16.8 GB of 16.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 16/02/02 20:33:56 INFO yarn.YarnAllocator: Will request 1 executor containers, each with 2 cores and 16768 MB memory including 384 MB overhead 16/02/02 20:33:56 INFO yarn.YarnAllocator: Container request (host: Any, capability: <memory:16768, vCores:2>) 16/02/02 20:33:57 INFO yarn.YarnAllocator: Launching container container_1454421662639_0011_01_37 for on host 10.0.0.8 16/02/02 20:33:57 INFO yarn.YarnAllocator: Launching ExecutorRunnable. driverUrl: akka.tcp://sparkDriver@10.0.0.15:47446/user/CoarseGrainedScheduler, executorHostname: 10.0.0.8 16/02/02 20:33:57 INFO yarn.YarnAllocator: Received 1 containers from YARN, launching executors on 1 of them. I'll really appreciate any help here. Thank you, Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
RE: Spark 1.5.2 memory error
For the memoryOvethead I have the default of 10% of 16g, and Spark version is 1.5.2. Stefan Panayotov, PhD Sent from Outlook Mail for Windows 10 phone From: Ted Yu Sent: Tuesday, February 2, 2016 4:52 PM To: Jakob Odersky Cc: Stefan Panayotov; user@spark.apache.org Subject: Re: Spark 1.5.2 memory error What value do you use for spark.yarn.executor.memoryOverhead ? Please see https://spark.apache.org/docs/latest/running-on-yarn.html for description of the parameter. Which Spark release are you using ? Cheers On Tue, Feb 2, 2016 at 1:38 PM, Jakob Odersky <ja...@odersky.com> wrote: Can you share some code that produces the error? It is probably not due to spark but rather the way data is handled in the user code. Does your code call any reduceByKey actions? These are often a source for OOM errors. On Tue, Feb 2, 2016 at 1:22 PM, Stefan Panayotov <spanayo...@msn.com> wrote: > Hi Guys, > > I need help with Spark memory errors when executing ML pipelines. > The error that I see is: > > > 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 32.0 in > stage 32.0 (TID 3298) > > > 16/02/02 20:34:17 INFO Executor: Executor is trying to kill task 12.0 in > stage 32.0 (TID 3278) > > > 16/02/02 20:34:39 INFO MemoryStore: ensureFreeSpace(2004728720) called with > curMem=296303415, maxMem=8890959790 > > > 16/02/02 20:34:39 INFO MemoryStore: Block taskresult_3298 stored as bytes in > memory (estimated size 1911.9 MB, free 6.1 GB) > > > 16/02/02 20:34:39 ERROR CoarseGrainedExecutorBackend: RECEIVED SIGNAL 15: > SIGTERM > > > 16/02/02 20:34:39 ERROR Executor: Exception in task 12.0 in stage 32.0 (TID > 3278) > > > java.lang.OutOfMemoryError: Java heap space > > > at java.util.Arrays.copyOf(Arrays.java:2271) > > > at > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) > > > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86) > > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256) > > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > at java.lang.Thread.run(Thread.java:745) > > > 16/02/02 20:34:39 INFO DiskBlockManager: Shutdown hook called > > > 16/02/02 20:34:39 INFO Executor: Finished task 32.0 in stage 32.0 (TID > 3298). 2004728720 bytes result sent via BlockManager) > > > 16/02/02 20:34:39 ERROR SparkUncaughtExceptionHandler: Uncaught exception in > thread Thread[Executor task launch worker-8,5,main] > > > java.lang.OutOfMemoryError: Java heap space > > > at java.util.Arrays.copyOf(Arrays.java:2271) > > > at > java.io.ByteArrayOutputStream.toByteArray(ByteArrayOutputStream.java:191) > > > at > org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:86) > > > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:256) > > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > > > at java.lang.Thread.run(Thread.java:745) > > > 16/02/02 20:34:39 INFO ShutdownHookManager: Shutdown hook called > > > 16/02/02 20:34:39 INFO MetricsSystemImpl: Stopping azure-file-system metrics > system... > > > 16/02/02 20:34:39 INFO MetricsSinkAdapter: azurefs2 thread interrupted. > > > 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics system > stopped. > > > 16/02/02 20:34:39 INFO MetricsSystemImpl: azure-file-system metrics system > shutdown complete. > > > > > > And ….. > > > > > > 16/02/02 20:09:03 INFO impl.ContainerManagementProtocolProxy: Opening proxy > : 10.0.0.5:30050 > > > 16/02/02 20:33:51 INFO yarn.YarnAllocator: Completed container > container_1454421662639_0011_01_05 (state: COMPLETE, exit status: -104) > > > 16/02/02 20:33:51 WARN yarn.YarnAllocator: Container killed by YARN for > exceeding memory limits. 16.8 GB of 16.5 GB physical memory used. Consider > boosting spark.yarn.executor.memoryOverhead. > > > 16/02/02 20:33:56 INFO yarn.YarnAllocator: Will request 1 executor > containers, each with 2 cores and 16768 MB memory including 384 MB overhead > > > 16/02/02 20:33:56 INFO yarn.YarnAllocator: Container request (host: Any, > capability: <memory:16768, vCores:2>) > > > 16/02/02 20:33:57 INFO yarn.YarnAllocator: Launching container > container_1454
RE: Python UDFs
Thanks, Jacob. But it seems that Python requires the RETURN Type to be specified. And DenseVector is not a valid return type, or I do not know the correct type to put in. Shall I try ArrayType? Any ideas? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net > Date: Wed, 27 Jan 2016 15:03:06 -0800 > Subject: Re: Python UDFs > From: ja...@odersky.com > To: spanayo...@msn.com > CC: user@spark.apache.org > > Have you checked: > > - the mllib doc for python > https://spark.apache.org/docs/1.6.0/api/python/pyspark.mllib.html#pyspark.mllib.linalg.DenseVector > - the udf doc > https://spark.apache.org/docs/1.6.0/api/python/pyspark.sql.html#pyspark.sql.functions.udf > > You should be fine in returning a DenseVector as the return type of > the udf, as it provides access to a schema. > > These are just directions to explore, I haven't used PySpark myself. > > On Wed, Jan 27, 2016 at 10:38 AM, Stefan Panayotov <spanayo...@msn.com> wrote: > > Hi, > > > > I have defined a UDF in Scala like this: > > > > import org.apache.spark.mllib.linalg.Vector > > import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, > > Statistics} > > import org.apache.spark.mllib.linalg.DenseVector > > > > val determineVector = udf((a: Double, b: Double) => { > > val data: Array[Double] = Array(a,b) > > val dv = new DenseVector(data) > > dv > > }) > > > > How can I write the corresponding function in Pyhton/Pyspark? > > > > Thanks for your help > > > > Stefan Panayotov, PhD > > Home: 610-355-0919 > > Cell: 610-517-5586 > > email: spanayo...@msn.com > > spanayo...@outlook.com > > spanayo...@comcast.net > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org >
Python UDFs
Hi, I have defined a UDF in Scala like this: import org.apache.spark.mllib.linalg.Vector import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.linalg.DenseVector val determineVector = udf((a: Double, b: Double) => { val data: Array[Double] = Array(a,b) val dv = new DenseVector(data) dv }) How can I write the corresponding function in Pyhton/Pyspark? Thanks for your help Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
RE: Spark SQL running totals
Thanks Deenar. This works perfectly. I can't test the solution with window functions because I am still on Spark 1.3.1 Hopefully will move to 1.5 soon. Stefan Panayotov Sent from my Windows Phone From: Deenar Toraskar<mailto:deenar.toras...@gmail.com> Sent: 10/15/2015 2:35 PM To: Stefan Panayotov<mailto:spanayo...@msn.com> Cc: user@spark.apache.org<mailto:user@spark.apache.org> Subject: Re: Spark SQL running totals you can do a self join of the table with itself with the join clause being a.col1 >= b.col1 select a.col1, a.col2, sum(b.col2) from tablea as a left outer join tablea as b on (a.col1 >= b.col1) group by a.col1, a.col2 I havent tried it, but cant see why it cant work, but doing it in RDD might be more efficient see https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/ On 15 October 2015 at 18:48, Stefan Panayotov <spanayo...@msn.com> wrote: > Hi, > > I need help with Spark SQL. I need to achieve something like the following. > If I have data like: > > col_1 col_2 > 1 10 > 2 30 > 3 15 > 4 20 > 5 25 > > I need to get col_3 to be the running total of the sum of the previous > rows of col_2, e.g. > > col_1 col_2 col_3 > 1 1010 > 2 3040 > 3 1555 > 4 2075 > 5 25100 > > Is there a way to achieve this in Spark SQL or maybe with Data frame > transformations? > > Thanks in advance, > > > *Stefan Panayotov, PhD **Home*: 610-355-0919 > *Cell*: 610-517-5586 > *email*: spanayo...@msn.com > spanayo...@outlook.com > spanayo...@comcast.net > >
RE: Spark SQL running totals
Thanks to all of you guys for the helpful suggestions. I'll try these first thing tomorrow morning. Stefan Panayotov Sent from my Windows Phone From: java8964<mailto:java8...@hotmail.com> Sent: 10/15/2015 4:30 PM To: Michael Armbrust<mailto:mich...@databricks.com>; Deenar Toraskar<mailto:deenar.toras...@gmail.com> Cc: Stefan Panayotov<mailto:spanayo...@msn.com>; user@spark.apache.org<mailto:user@spark.apache.org> Subject: RE: Spark SQL running totals My mistake. I didn't noticed "UNBOUNDED PRECEDING" already supported. So cumulative sum should work then. Thanks Yong From: java8...@hotmail.com To: mich...@databricks.com; deenar.toras...@gmail.com CC: spanayo...@msn.com; user@spark.apache.org Subject: RE: Spark SQL running totals Date: Thu, 15 Oct 2015 16:24:39 -0400 Not sure the windows function can work for his case. If you do a "sum() over (partitioned by)", that will return a total sum per partition, instead of a cumulative sum wanted in this case. I saw there is a "cume_dis", but no "cume_sum". Do we really have a "cume_sum" in Spark window function, or am I total misunderstand about "sum() over (partitioned by)" in it? Yong From: mich...@databricks.com Date: Thu, 15 Oct 2015 11:51:59 -0700 Subject: Re: Spark SQL running totals To: deenar.toras...@gmail.com CC: spanayo...@msn.com; user@spark.apache.org Check out: https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html On Thu, Oct 15, 2015 at 11:35 AM, Deenar Toraskar <deenar.toras...@gmail.com> wrote: you can do a self join of the table with itself with the join clause being a.col1 >= b.col1 select a.col1, a.col2, sum(b.col2)from tablea as a left outer join tablea as b on (a.col1 >= b.col1)group by a.col1, a.col2 I havent tried it, but cant see why it cant work, but doing it in RDD might be more efficient see https://bzhangusc.wordpress.com/2014/06/21/calculate-running-sums/ On 15 October 2015 at 18:48, Stefan Panayotov <spanayo...@msn.com> wrote: Hi, I need help with Spark SQL. I need to achieve something like the following. If I have data like: col_1 col_2 1 10 2 30 3 15 4 20 5 25 I need to get col_3 to be the running total of the sum of the previous rows of col_2, e.g. col_1 col_2 col_3 1 1010 2 3040 3 1555 4 2075 5 25100 Is there a way to achieve this in Spark SQL or maybe with Data frame transformations? Thanks in advance, Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
Spark SQL running totals
Hi, I need help with Spark SQL. I need to achieve something like the following. If I have data like: col_1 col_2 1 10 2 30 3 15 4 20 5 25 I need to get col_3 to be the running total of the sum of the previous rows of col_2, e.g. col_1 col_2 col_3 1 1010 2 3040 3 1555 4 2075 5 25100 Is there a way to achieve this in Spark SQL or maybe with Data frame transformations? Thanks in advance, Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
HiveContext error
Hello, I am trying to define an external Hive table from Spark HiveContext like the following: import org.apache.spark.sql.hive.HiveContext val hiveCtx = new HiveContext(sc) hiveCtx.sql(sCREATE EXTERNAL TABLE IF NOT EXISTS Rentrak_Ratings (Version string, Gen_Date string, Market_Number string, Market_Name string, Time_Zone string, Number_Households string, | DateTime string, Program_Start_Time string, Program_End_Time string, Station string, Station_Name string, Call_Sign string, Network_Name string, Program string, | Series_Name string, Series_Number string, Episode_Number string, Episode_Title string, Demographic string, Demographic_Name string, HHUniverse string, | Share_15min_Segment string, PHUT_15min_Segment string, Rating_15min_Segment string, AV_Audience_15min_Segment string) | PARTITIONED BY (year INT, month INT) | ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'.stripMargin) And I am getting the following error: org.apache.spark.sql.execution.QueryExecutionException: FAILED: Hive Internal Error: java.lang.ClassNotFoundException(org.apache.hadoop.hive.ql.hooks.ATSHook) at org.apache.spark.sql.hive.HiveContext.runHive(HiveContext.scala:324) at org.apache.spark.sql.hive.HiveContext.runSqlHive(HiveContext.scala:292) at org.apache.spark.sql.hive.execution.HiveNativeCommand.run(HiveNativeCommand.scala:33) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:54) at org.apache.spark.sql.execution.ExecutedCommand.execute(commands.scala:64) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd$lzycompute(SQLContext.scala:1099) at org.apache.spark.sql.SQLContext$QueryExecution.toRdd(SQLContext.scala:1099) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:147) at org.apache.spark.sql.DataFrame.init(DataFrame.scala:130) at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:51) at org.apache.spark.sql.hive.HiveContext.sql(HiveContext.scala:103) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:27) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:37) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.init(console:39) at $iwC$$iwC$$iwC$$iwC$$iwC.init(console:41) at $iwC$$iwC$$iwC$$iwC.init(console:43) at $iwC$$iwC$$iwC.init(console:45) at $iwC$$iwC.init(console:47) at $iwC.init(console:49) at init(console:51) at .init(console:55) at .clinit(console) at .init(console:7) at .clinit(console) Can anybody help please? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
FW: Executing spark code in Zeppelin
Stefan Panayotov Sent from my Windows Phone From: Stefan Panayotovmailto:spanayo...@msn.com Sent: 7/29/2015 8:20 AM To: user-subscr...@spark.apache.orgmailto:user-subscr...@spark.apache.org Subject: Executing spark code in Zeppelin Hi, I faced a problem with running long code snippets in Zeppelin paragraph. If the code passes certain limit (I still have to check exactly the limit) clicking on the run button, or pressing Shift-Enter does nothing. This effect can be demonstrated even with adding comments to the code. Has anybody stumbled on such a problem? Stefan Panayotov Sent from my Windows Phone
RE: Executing spark code in Zeppelin
Hi Silvio, There are no errors reported. Just the Zeppelin paragraph refuses to do anything. I tested this by just adding comment characters to the code snippet. Also, I discovered the limit is 3400. Once I go over that it stops reacting. BTW, thanks for the advice - I sent email to the Zeppelin community. Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net From: silvio.fior...@granturing.com To: spanayo...@msn.com; user@spark.apache.org Subject: Re: Executing spark code in Zeppelin Date: Wed, 29 Jul 2015 12:50:51 + Hi Stefan, Might be best to move this over to the Zeppelin user list at us...@zeppelin.incubator.apache.org For now, do you see any errors in the logs? Could be a syntax or compiler error that isn’t getting reported back to the UI. Thanks, Silvio From: Stefan Panayotov Date: Wednesday, July 29, 2015 at 8:21 AM To: user@spark.apache.org Subject: FW: Executing spark code in Zeppelin Stefan Panayotov Sent from my Windows Phone From: Stefan Panayotov Sent: 7/29/2015 8:20 AM To: user-subscr...@spark.apache.org Subject: Executing spark code in Zeppelin Hi, I faced a problem with running long code snippets in Zeppelin paragraph. If the code passes certain limit (I still have to check exactly the limit) clicking on the run button, or pressing Shift-Enter does nothing. This effect can be demonstrated even with adding comments to the code. Has anybody stumbled on such a problem? Stefan Panayotov Sent from my Windows Phone
Zeppelin notebook question
Hi, When I create a DataFrame through Spark SQLContext and then register temp table I can use %sql Zeppelin interpreter to open a nice SQL paragraph. If on the other hand I do the same through HiveContext, I can't see those tables in the %sql show tables. Is there a way to query the HiveContext tables in %sql? Thanks, Stefan Panayotov, Ph.D. email: spanayo...@msn.com home: 610-355-0919 cell: 610-517-5586
RE: Add column to DF
This is working! Thank you so much :) Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net From: mich...@databricks.com Date: Tue, 21 Jul 2015 12:08:04 -0700 Subject: Re: Add column to DF To: spanayo...@msn.com CC: user@spark.apache.org Try instead: import org.apache.spark.sql.functions._ val determineDayPartID = udf((evntStDate: String, evntStHour: String) = { val stFormat = new java.text.SimpleDateFormat(yyMMdd) var stDateStr:String = evntStDate.substring(2,8) val stDate:Date = stFormat.parse(stDateStr) val stHour = evntStHour.substring(1,3).toDouble + 0.1 var bucket = Math.ceil(stHour/3.0).toInt val cal:Calendar = Calendar.getInstance cal.setTime(stDate) var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) if (dayOfWeek == 1) dayOfWeek = 8 if (dayOfWeek 6) bucket = bucket + 8 bucket }) input.withColumn(DayPartID, determineDayPartID (col(StartDate), col(EventStartHour)))
Add column to DF
Hi, I am trying to ad a column to a data frame that I created based on a JSON file like this: val input = hiveCtx.jsonFile(wasb://n...@cmwhdinsightdatastore.blob.core.windows.net/json/*).toDF().persist(StorageLevel.MEMORY_AND_DISK) I have a function that is generating the values for the new column: def determineDayPartID(evntStDate: String, evntStHour: String) : Int = { val stFormat = new java.text.SimpleDateFormat(yyMMdd) var stDateStr:String = evntStDate.substring(2,8) val stDate:Date = stFormat.parse(stDateStr) val stHour = evntStHour.substring(1,3).toDouble + 0.1 var bucket = Math.ceil(stHour/3.0).toInt val cal:Calendar = Calendar.getInstance cal.setTime(stDate) var dayOfWeek = cal.get(Calendar.DAY_OF_WEEK) if (dayOfWeek == 1) dayOfWeek = 8 if (dayOfWeek 6) bucket = bucket + 8 return bucket } When I try: input.withColumn(DayPartID, callUDF(determineDayPartID, IntegerType, col(StartDate), col(EventStartHour))) I am getting the error: missing arguments for method determineDayPartID in object rating; follow this method with `_' if you want to treat it as a partially applied function Can you please help? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
RE: import errors with Eclipse Scala
Thanks, Jem. I added scala-compiler.jar from C:\Eclipse\eclipse\plugins\org.scala-ide.scala210.jars_4.1.0.201505250838\target\jars And looks like this resolved the issue. Thanks once again. Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net From: jem.tuc...@gmail.com Date: Wed, 1 Jul 2015 18:20:51 + Subject: Re: import errors with Eclipse Scala To: spanayo...@msn.com; yuzhih...@gmail.com CC: user@spark.apache.org in eclipse you can just add the spark assembly jar to the build path, right click the project build path configure build path library add external jars On Wed, Jul 1, 2015 at 7:15 PM Stefan Panayotov spanayo...@msn.com wrote: Hi Ted, How can I import the relevant Spark projects into Eclipse? Do I need to add anything the Java Build Path in the project properties? Also, I have installed sbt on my machine. Is there a corresponding sbt command to the maven command below? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net Date: Wed, 1 Jul 2015 10:04:23 -0700 Subject: Re: import errors with Eclipse Scala From: yuzhih...@gmail.com To: spanayo...@msn.com CC: user@spark.apache.org Have you imported the relevant Spark projects into Eclipse. You can run command similar to the following to generate project files for Spark: mvn clean package -DskipTests eclipse:eclipse On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote: Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error. For example: import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql._ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or “object json4s is not member of package org” How can I resolve this? Thanks, Stefan Panayotov, PhD email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
RE: import errors with Eclipse Scala
Hi Ted, How can I import the relevant Spark projects into Eclipse? Do I need to add anything the Java Build Path in the project properties? Also, I have installed sbt on my machine. Is there a corresponding sbt command to the maven command below? Stefan Panayotov, PhD Home: 610-355-0919 Cell: 610-517-5586 email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net Date: Wed, 1 Jul 2015 10:04:23 -0700 Subject: Re: import errors with Eclipse Scala From: yuzhih...@gmail.com To: spanayo...@msn.com CC: user@spark.apache.org Have you imported the relevant Spark projects into Eclipse. You can run command similar to the following to generate project files for Spark: mvn clean package -DskipTests eclipse:eclipse On Wed, Jul 1, 2015 at 9:57 AM, Stefan Panayotov spanayo...@msn.com wrote: Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error. For example: import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql._ import org.json4s._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods import org.json4s.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or “object json4s is not member of package org” How can I resolve this? Thanks, Stefan Panayotov, PhD email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net
import errors with Eclipse Scala
Hi Team, Just installed Eclipse with Scala plugin to benefit from IDE environment and I faced the problem that any import statement gives me an error.For example: import org.apache.spark.SparkConfimport org.apache.spark.SparkContextimport org.apache.spark.sql.hive.HiveContextimport org.apache.spark.sql._import org.json4s._import org.json4s.JsonDSL._import org.json4s.jackson.JsonMethodsimport org.json4s.jackson.JsonMethods._ All report errors of type: “object apache is not member of package org” or“object json4s is not member of package org” How can I resolve this? Thanks, Stefan Panayotov, PhD email: spanayo...@msn.com spanayo...@outlook.com spanayo...@comcast.net