Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread Tathagata Das
Note that this is not public API yet. Hence this is not very documented. So
use it at your own risk :)

On Tue, Jul 10, 2018 at 11:04 AM, subramgr 
wrote:

> Hi,
>
> This looks very daunting *trait* is there some blog post or some articles
> which explains on how to implement this *trait*
>
> Thanks
> Girish
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


[Structured Streaming] Fine tuning GC performance

2018-07-10 Thread subramgr
Hi, 

Are there any specific methods to fine tune our Structured Streaming job ?
Or is it similar to once mentioned here for RDDs
https://spark.apache.org/docs/latest/tuning.html



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Unable to alter partition. The transaction for alter partition did not commit successfully.

2018-07-10 Thread Arun Hive
 I am reading data from Kafka topics using create stream and pushing it to hive 
by using dataframes. The job seems to run fine for the 5-6 hours and then it 
fails with the above exception. 
On Wednesday, May 30, 2018, 3:31:10 PM PDT, naresh Goud 
 wrote:  
 
 What are you doing? Give more details o what are you doing 
On Wed, May 30, 2018 at 12:58 PM Arun Hive  wrote:

 
Hi 
While running my spark job component i am getting the following exception. 
Requesting for your help on this:Spark core version - spark-core_2.10-2.1.1
Spark streaming version -spark-streaming_2.10-2.1.1
Spark hive version -spark-hive_2.10-2.1.1

2018-05-28 00:08:04,317  [streaming-job-executor-2] ERROR (Hive.java:1883) - 
org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter partition. 
The transaction for alter partition did not commit successfully.
 at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:573)
 at org.apache.hadoop.hive.ql.metadata.Hive.alterPartition(Hive.java:546)
 at org.apache.hadoop.hive.ql.metadata.Hive.alterPartitionSpec(Hive.java:1915)
 at org.apache.hadoop.hive.ql.metadata.Hive.getPartition(Hive.java:1875)
 at org.apache.hadoop.hive.ql.metadata.Hive.loadPartition(Hive.java:1407)
 at 
org.apache.hadoop.hive.ql.metadata.Hive.loadDynamicPartitions(Hive.java:1593)
 at sun.reflect.GeneratedMethodAccessor123.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.spark.sql.hive.client.Shim_v1_2.loadDynamicPartitions(HiveShim.scala:831)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveClientImpl.scala:693)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$loadDynamicPartitions$1.apply(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl$$anonfun$withHiveState$1.apply(HiveClientImpl.scala:279)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.liftedTree1$1(HiveClientImpl.scala:226)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.retryLocked(HiveClientImpl.scala:225)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.withHiveState(HiveClientImpl.scala:268)
 at 
org.apache.spark.sql.hive.client.HiveClientImpl.loadDynamicPartitions(HiveClientImpl.scala:691)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply$mcV$sp(HiveExternalCatalog.scala:823)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog$$anonfun$loadDynamicPartitions$1.apply(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:97)
 at 
org.apache.spark.sql.hive.HiveExternalCatalog.loadDynamicPartitions(HiveExternalCatalog.scala:811)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult$lzycompute(InsertIntoHiveTable.scala:319)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.sideEffectResult(InsertIntoHiveTable.scala:221)
 at 
org.apache.spark.sql.hive.execution.InsertIntoHiveTable.doExecute(InsertIntoHiveTable.scala:407)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
 at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
 at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
 at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
 at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
 at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
 at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
 at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:263)
 at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:243)

-
 
-
 
-
 at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
 at 
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)
 at 
org.apache.spark.streaming.dstream.DStream$$anonfun

Convert scientific notation DecimalType

2018-07-10 Thread dimitris plakas
Hello everyone,

I am new in Pyspark and i am facing a problem in casting some values in
DecimalType. To clarify my question i present an example.

i have a dataframe in which i store my data which are some trajectories the
dataframe looks like

*Id | Trajectory*

id1 | [ [x1, y1, t1], [x2, y2, t2], ...[xn, yn, tn] ]
id2 | [ [x1, y1, t1], [x2, y2, t2], ...[xn, yn, tn] ]

So for each ID i have a trajectory after applying group by in a previous
dataframe.
x= lon
y=lat
t=time (in seconds).

When i store the data in the dataframe the time is displayed in scientific
notation. Is there any way to convert the t in DecimalType(10,0) ?

I tried to convert it in the beggining but i get an error when i try to
store the trajectories because the column Trajectory is
ArrayType(ArrayType(FloatType() ) ).


[Spark MLib]: RDD caching behavior of KMeans

2018-07-10 Thread mkhan37
Hi All,

I was varying the storage levels of RDD caching in the KMeans program
implemented using the MLib library and got some very confusing and
interesting results. The base code of the application is from a Benchmark
suite named  SparkBench   . I changed
the storage levels of the data RDD passed to the Kmeans train function and
it seems like MEMORY_AND_DISK_SER is performing quite worse compared to
DISK_ONLY level. MEMORY_AND_DISK level performed the best as expected. But
as to why Memory serialized storage level is performing worse than Disk
serialized level is very confusing. I am using 1 node as master and 4 nodes
as slaves with each executor having a 48g JVM. The cached data should also
fit within the memory easily.

If anyone has any idea or suggestion on why this behavior is happening
please let me know.

Regards,
Muhib 



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread subramgr
Hi, 

This looks very daunting *trait* is there some blog post or some articles
which explains on how to implement this *trait*

Thanks
Girish



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How Kryo serializer allocates buffer in Spark

2018-07-10 Thread nirav
I am getting following error in spark task. Default max value is 64mb!
Document says it should be large enough to store largest object in my
application. I don't think I have any object thhhat is bigger then 64mb. SO
what these values (spark.kryoserializer.buffer,
spark.kryoserializer.buffer.max) means?

Is that a buffer per executor or buffer per executor per core ? I have 6
cores per executors so do all 6 are writing to this common buffer?   in
that case I have 16mb buffer per core. Please explain. Thanks!


Job aborted due to stage failure: Task 3 in stage 4.0 failed 10 times, most
recent failure: Lost task 3.9 in stage 4.0 (TID 16,
iadprd01mpr005.mgmt-a.xactly.iad.dc.local): org.apache.spark.SparkException:
*Kryo serialization failed: Buffer overflow. Available: 0, required: 19. To
avoid this, increase spark.kryoserializer.buffer.max value.*
at org.apache.spark.serializer.KryoSerializerInstance.
serialize(KryoSerializer.scala:300)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
at java.util.concurrent.ThreadPoolExecutor.runWorker(
ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(
ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: [Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread Stefan Van Wouw
Hi Girish,

You can implement a custom state store provider by implementing the
StateStore
trait (
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
)
and setting the correct Spark configuration accordingly:

spark.conf.set(
  "spark.sql.streaming.stateStore.providerClass",
  "com.example.path.to.CustomStateStore")


See also
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
for
the default implementation that is used.

Hope this helps!

Stefan


On Tue, Jul 10, 2018 at 7:06 AM subramgr 
wrote:

> Hi,
> Currently we are using HDFS for our checkpointing but we are having issues
> maintaining a HDFS cluster.
>
> We tried glusterfs in the past for checkpointing but in our setup glusterfs
> does not work well.
>
> We are evaluating using Cassandra for storing the checkpoint data. Has any
> one implemented *StateStoreProvider* any blogs or articles which describe
> how to create our own *checkpointing* implementation
>
> Thanks
>
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
> --
Stefan van Wouw
Databricks Inc.
stefan.vanw...@databricks.com

databricks.com

[image: http://databricks.com] 


[image: https://databricks.com/sparkaisummit/eu]


Re: Dynamic allocation not releasing executors after unpersisting all cached data

2018-07-10 Thread Jeffrey Charles
Thanks for the suggestion. I gave it a try but the executor still isn't
being released several minutes after running that.

On Mon, Jul 9, 2018 at 3:51 PM Vadim Semenov  wrote:

> Try doing `unpersist(blocking=true)`
> On Mon, Jul 9, 2018 at 2:59 PM Jeffrey Charles
>  wrote:
> >
> > I'm persisting a dataframe in Zeppelin which has dynamic allocation
> enabled to get a sense of how much memory the dataframe takes up. After I
> note the size, I unpersist the dataframe. For some reason, Yarn is not
> releasing the executors that were added to Zeppelin. If I don't run the
> persist and unpersist steps, the executors that were added are removed
> about a minute after the paragraphs complete. Looking at the storage tab in
> the Spark UI for the Zeppelin job, I don't see anything cached.
> >
> > Is there any way to get Yarn to automatically remove executors after
> doing a persist followed by an unpersist if there is no activity on the
> executor within the configured dynamic allocation timeout (similar to how
> it works without a persist/unpersist cycle) without having to set
> spark.dynamicAllocation.cachedExecutorIdleTimeout? The main reason I'd like
> to avoid setting that configuration is I do not want to the executors being
> reclaimed if they do have cached data.
>
>
>
> --
> Sent from my iPhone
>


Re: [SPARK on MESOS] Avoid re-fetching Spark binary

2018-07-10 Thread Mark Hamstra
It's been done many times before by many organizations. Use Spark Job
Server or Livy or create your own implementation of a similar long-running
Spark Application. Creating a new Application for every Job is not the way
to achieve low-latency performance.

On Tue, Jul 10, 2018 at 4:18 AM  wrote:

> Dear,
>
> Our jobs are triggered by users on demand.
> And new job will be submitted to Spark server via REST API. The 2-4
> seconds of latency is mainly because of the initialization of SparkContext
> every time new job is submitted, as you have mentioned.
>
> If you are aware of a way to avoid this initialization, could you please
> share it. That would be perfect for our case.
>
> Best
> Tien Dat
>
> 
> Essentially correct. The latency to start a Spark Job is nowhere close to
> 2-4 seconds under typical conditions. Creating a new Spark Application
> every time instead of running multiple Jobs in one Application is not going
> to lead to acceptable interactive or real-time performance, nor is that an
> execution model that Spark is ever likely to support in trying to meet
> low-latency requirements. As such, reducing Application startup time (not
> Job startup time) is not a priority.
>
> On Fri, Jul 6, 2018 at 4:06 PM Timothy Chen  wrote:
>
> > I know there are some community efforts shown in Spark summits before,
> > mostly around reusing the same Spark context with multiple “jobs”.
> >
> > I don’t think reducing Spark job startup time is a community priority
> > afaik.
> >
> > Tim
> > On Fri, Jul 6, 2018 at 7:12 PM Tien Dat  wrote:
> >
> >> Dear Timothy,
> >>
> >> It works like a charm now.
> >>
> >> BTW (don't judge me if I am to greedy :-)), the latency to start a Spark
> >> job
> >> is around 2-4 seconds, unless I am not aware of some awesome
> optimization
> >> on
> >> Spark. Do you know if Spark community is working on reducing this
> >> latency?
> >>
> >> Best
> >>
> >>
> >>
> >> --
> >> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
> >>
> >> -
> >> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
> >>
> >>
>
> 
> Quoted from:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/SPARK-on-MESOS-Avoid-re-fetching-Spark-binary-tp32849p32865.html
>
>
> _
> Sent from http://apache-spark-user-list.1001560.n3.nabble.com
>
>


Re: Spark on Mesos - Weird behavior

2018-07-10 Thread Thodoris Zois
Actually after some experiments we figured out that spark.max.cores / 
spark.executor.cores is the upper bound for the executors. Spark apps will run 
even only if one executor can be launched. 

Is there any way to specify also the lower bound? It is a bit annoying that 
seems that we can’t control the resource usage of an application. By the way, 
we are not using dynamic allocation. 

- Thodoris 


> On 10 Jul 2018, at 14:35, Pavel Plotnikov  
> wrote:
> 
> Hello Thodoris!
> Have you checked this:
>  - does mesos cluster have available resources?
>   - if spark have waiting tasks in queue more than 
> spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
>  - And then, have you checked that mesos send offers to spark app mesos 
> framework at least with 10 cores and 2GB RAM?
> 
> If mesos have not available offers with 10 cores, for example, but have with 
> 8 or 9, so you can use smaller executers for better fit for available 
> resources on nodes for example with 4 cores and 1 GB RAM, for example
> 
> Cheers,
> Pavel
> 
>> On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:
>> Hello list,
>> 
>> We are running Apache Spark on a Mesos cluster and we face a weird behavior 
>> of executors. When we submit an app with e.g 10 cores and 2GB of memory and 
>> max cores 30, we expect to see 3 executors running on the cluster. However, 
>> sometimes there are only 2... Spark applications are not the only one that 
>> run on the cluster. I guess that Spark starts executors on the available 
>> offers even if it does not satisfy our needs. Is there any configuration 
>> that we can use in order to prevent Spark from starting when there are no 
>> resource offers for the total number of executors?
>> 
>> Thank you 
>> - Thodoris 
>> 
>> -
>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>> 


[Structured Streaming] Custom StateStoreProvider

2018-07-10 Thread subramgr
Hi, 
Currently we are using HDFS for our checkpointing but we are having issues
maintaining a HDFS cluster.

We tried glusterfs in the past for checkpointing but in our setup glusterfs
does not work well.

We are evaluating using Cassandra for storing the checkpoint data. Has any
one implemented *StateStoreProvider* any blogs or articles which describe
how to create our own *checkpointing* implementation 

Thanks




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Emit Custom metrics in Spark Structured Streaming job

2018-07-10 Thread subramgr
Hi 

I am working on implementing my idea but here is how it goes:

1. Use this library https://github.com/groupon/spark-metrics
2. Have a cron job which periodically curl /metrics/json endpoint at driver
and all other nodes
3. Parse the response and send the data through a telegraf agent installed
on the node.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



[ANNOUNCE] Apache Spark 2.2.2

2018-07-10 Thread Tom Graves
We are happy to announce the availability of Spark 2.2.2!
Apache Spark 2.2.2 is a maintenance release, based on the branch-2.2 
maintenance branch of Spark. We strongly recommend all 2.2.x users to upgrade 
to this stable release. The release notes are available at 
http://spark.apache.org/releases/spark-release-2-2-2.html

To download Apache Spark 2.2.2 visit http://spark.apache.org/downloads.html. 
This version of Spark is also available on Maven and PyPI.
We would like to acknowledge all community members for contributing patches to 
this release.



Unpivoting

2018-07-10 Thread amin mohebbi
Does anyone know how to transpose the columns in Spark -scala ? 
This is how I want to unpivot the table  :
How to unpivot the table based on the multiple columns


| 
| 
| 
|  |  |

 |

 |
| 
|  | 
How to unpivot the table based on the multiple columns

I am using Scala and Spark to unpivot a table which looks like as below: 
+---+--+-+-+-...
 |

 |

 |






Re: Spark on Mesos - Weird behavior

2018-07-10 Thread Pavel Plotnikov
Hello Thodoris!
Have you checked this:
 - does mesos cluster have available resources?
  - if spark have waiting tasks in queue more than
spark.dynamicAllocation.schedulerBacklogTimeout configuration value?
 - And then, have you checked that mesos send offers to spark app mesos
framework at least with 10 cores and 2GB RAM?

If mesos have not available offers with 10 cores, for example, but have
with 8 or 9, so you can use smaller executers for better fit for available
resources on nodes for example with 4 cores and 1 GB RAM, for example

Cheers,
Pavel

On Mon, Jul 9, 2018 at 9:05 PM Thodoris Zois  wrote:

> Hello list,
>
> We are running Apache Spark on a Mesos cluster and we face a weird
> behavior of executors. When we submit an app with e.g 10 cores and 2GB of
> memory and max cores 30, we expect to see 3 executors running on the
> cluster. However, sometimes there are only 2... Spark applications are not
> the only one that run on the cluster. I guess that Spark starts executors
> on the available offers even if it does not satisfy our needs. Is there any
> configuration that we can use in order to prevent Spark from starting when
> there are no resource offers for the total number of executors?
>
> Thank you
> - Thodoris
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>


Re: Emit Custom metrics in Spark Structured Streaming job

2018-07-10 Thread chandan prakash
Hi Subramanian,
Did you find any solution for this ?
I am looking for something similar too.

Regards,
Chandan

On Wed, Jun 27, 2018 at 9:47 AM subramgr 
wrote:

> I am planning to send these metrics to our KairosDB. Let me know if there
> are
> any examples that I can take a look
>
>
>
> --
> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>
> -
> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>
>

-- 
Chandan Prakash