Re: Use Arrow instead of Pickle without pandas_udf

2018-07-30 Thread Bryan Cutler
Here is a link to the JIRA for adding StructType support for scalar
pandas_udf https://issues.apache.org/jira/browse/SPARK-24579


On Wed, Jul 25, 2018 at 3:36 PM, Hichame El Khalfi 
wrote:

> Hey Holden,
> Thanks for your reply,
>
> We currently using a python function that produces a Row(TS=LongType(),
> bin=BinaryType()).
> We use this function like this dataframe.rdd.map(my_function)
> .toDF().write.parquet()
>
> To reuse it in pandas_udf, we changes the return type to
> StructType(StructField(Long), StructField(BinaryType).
>
> 1)But we face an issue that StructType is not supported by pandas_udf.
>
> So I was wondering to still continue to reuse dataftame.rdd.map but get an
> improvement in serialization by using ArrowFormat instead of Pickle.
>
> *From:* hol...@pigscanfly.ca
> *Sent:* July 25, 2018 4:41 PM
> *To:* hich...@elkhalfi.com
> *Cc:* user@spark.apache.org
> *Subject:* Re: Use Arrow instead of Pickle without pandas_udf
>
> Not currently. What's the problem with pandas_udf for your use case?
>
> On Wed, Jul 25, 2018 at 1:27 PM, Hichame El Khalfi 
> wrote:
>
>> Hi There,
>>
>>
>> Is there a way to use Arrow format instead of Pickle but without using
>> pandas_udf ?
>>
>>
>> Thank for your help,
>>
>>
>> Hichame
>>
>
>
>
> --
> Twitter: https://twitter.com/holdenkarau
>


Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-30 Thread purna pradeep
Hello,



I’m getting below error in spark driver pod logs and executor pods are
getting killed midway through while the job is running  and even driver pod
Terminated with below intermittent error ,this happens if I run multiple
jobs in parallel.



Not able to see executor logs as executor pods are killed



org.apache.spark.SparkException: Job aborted due to stage failure: Task 23
in stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage
36.0 (TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1
exited caused by one of the running tasks) Reason: Executor lost for
unknown reasons.

Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)

at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)

... 42 mor


Executor lost for unknown reasons error Spark 2.3 on kubernetes

2018-07-30 Thread Mamillapalli, Purna Pradeep
Hello,

I’m getting below error in spark driver pod logs and executor pods are getting 
killed midway through while the job is running  and even driver pod Terminated 
with below intermittent error ,this happens if I run multiple jobs in parallel.

Not able to see executor logs as executor pods are killed

org.apache.spark.SparkException: Job aborted due to stage failure: Task 23 in 
stage 36.0 failed 4 times, most recent failure: Lost task 23.3 in stage 36.0 
(TID 1006, 10.10.125.119, executor 1): ExecutorLostFailure (executor 1 exited 
caused by one of the running tasks) Reason: Executor lost for unknown reasons.
Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
at scala.Option.foreach(Option.scala:257)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:194)
... 42 more


Thanks,
Purna


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Vadim Semenov
object MyDatabseSingleton {
@transient
lazy val dbConn = DB.connect(…)

`transient` marks the variable to be excluded from serialization

and `lazy` would open connection only when it's needed and also makes
sure that the val is thread-safe

http://fdahms.com/2015/10/14/scala-and-the-transient-lazy-val-pattern/
http://code-o-matic.blogspot.com/2009/05/double-checked-locking-idiom-sweet-in.html
On Mon, Jul 30, 2018 at 1:32 PM kant kodali  wrote:
>
> Hi Patrick,
>
> This object must be serializable right? I wonder if I will access to this 
> object in my driver(since it is getting created on the executor side) so I 
> can close when I am done with my batch?
>
> Thanks!
>
> On Mon, Jul 30, 2018 at 7:37 AM, Patrick McGloin  
> wrote:
>>
>> You could use an object in Scala, of which only one instance will be created 
>> on each JVM / Executor. E.g.
>>
>> object MyDatabseSingleton {
>> var dbConn = ???
>> }
>>
>> On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:
>>>
>>> Hi All,
>>>
>>> I understand creating a connection forEachPartition but I am wondering can 
>>> I create one DB connection per executor and close it after the job is done? 
>>> any sample code would help. you can imagine I am running a simple batch 
>>> processing application.
>>>
>>> Thanks!
>
>


-- 
Sent from my iPhone

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread kant kodali
Hi Patrick,

This object must be serializable right? I wonder if I will access to this
object in my driver(since it is getting created on the executor side) so I
can close when I am done with my batch?

Thanks!

On Mon, Jul 30, 2018 at 7:37 AM, Patrick McGloin 
wrote:

> You could use an object in Scala, of which only one instance will be
> created on each JVM / Executor. E.g.
>
> object MyDatabseSingleton {
> var dbConn = ???
> }
>
> On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:
>
>> Hi All,
>>
>> I understand creating a connection forEachPartition but I am wondering
>> can I create one DB connection per executor and close it after the job is
>> done? any sample code would help. you can imagine I am running a simple
>> batch processing application.
>>
>> Thanks!
>>
>


sorting on dataframe causes out of memory (java heap space)

2018-07-30 Thread msbreuer
While working with larger datasets I run into out of memory issues.
Basically a hadoop sequence file is read, its contents are sorted and a
hadoop map file is written back. Code works fine for workloads greater
than 20gb. Than I changed one column in my dataset to store a large
object and size of row object increased from 20kb to about 4mb. Now the
same code runs into java heap space issues and application is shut down
with an out of memory exception.

Seems dataframe sort operations cannot handle large objects. I took an
heap dump and saw an large array-of-array. I would expect such object
when using collect() operation when single task results are collected
into large array. I know, groupBy and collect() operation will cause
such problems on large datasets, but I expected a single sort should not
run into such issues. I switched from sort() to sortWithinPartitions()
and the applicationdid not crash. Of course, the result is not the same.
But shouldn't a simple sort() not work at all?

I created a simple test programm, which blows up a tiny Int-RDD to
Row-RDD with such large objects and found out, that spilling to disk
seems not to work out of the box. Defaults and any StorageLevel of
MEMORY* runs in this issues, only DISK_ONLY() works but is very slow.

I posted an question with example code to stackoverflow:
https://stackoverflow.com/questions/51546921/apache-spark-dataframe-causes-out-of-memory

My question to the community is, how to sort growing number of data
without increasing heap-size?

I found out following facts:

  * larger datasets require to set maxResult size to greater values or 0
for no limit
  * row object size seems to impact memory usage
  * GC1 garbage collector may run in fragmentation issues for large
objects, so I used parallelGc instead. In my case this has no
impact, after processing n tasks heap runs full
  * reducing driver- and executor memory takes no effect, heap always
fills in same way
  * persist with DISK* Storage level is no warranty that spark spills
data to disk
  * using kryo serializer has in my case less effect, some more tasks
are finsihed before oom occues
  * sortWithinPartitions works but after that only contents of
partitions are sorted

I assume sortWithinPartitions with a merge-shuffle-join should be okay
to sort the final result. But why does spark combine whole resultset on
driver? That is not very scalable?!

So I dropped large column and let spark sort the other columns and
finally I do left-join to combine large data back. Code runs without oom
but left-join looses sort-order. Any ideas?

May latest tests are not finished yet, but sorting on RDD instead of
dataframe seems to work better. Coding is more complex and I expected
catalyst optimizer in dataframes does not choose optimal settings.

How can growing datasets be sorted without increasing memory? Is my code
worse or is it just a spark bug?

My setup is:

- Windows 10, Java 1.8u144 (u171) with -Xms5g -Xmx5g and optional
-XX:+UseParallelOldGC

-Spark 2.3.1 in local mode (running as single node cluster on my
workstation)





-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Re: Kafka backlog - spark structured streaming

2018-07-30 Thread Arun Mahadevan
Heres a proposal to a add - https://github.com/apache/spark/pull/21819

Its always good to set "maxOffsetsPerTrigger" unless you want spark to
process till the end of the stream in each micro batch. Even without
"maxOffsetsPerTrigger" the lag can be non-zero by the time the micro batch
completes.

On 30 July 2018 at 08:50, Burak Yavuz  wrote:

> If you don't set rate limiting through `maxOffsetsPerTrigger`, Structured
> Streaming will always process until the end of the stream. So number of
> records waiting to be processed should be 0 at the start of each trigger.
>
> On Mon, Jul 30, 2018 at 8:03 AM, Kailash Kalahasti <
> kailash.kalaha...@gmail.com> wrote:
>
>> Is there any way to find out backlog on kafka topic while using spark
>> structured streaming ? I checked few consumer apis but that requires to
>> enable groupid for streaming, but seems it is not allowed.
>>
>> Basically i want to know number of records waiting to be processed.
>>
>> Any suggestions ?
>>
>
>


Re: Kafka backlog - spark structured streaming

2018-07-30 Thread Burak Yavuz
If you don't set rate limiting through `maxOffsetsPerTrigger`, Structured
Streaming will always process until the end of the stream. So number of
records waiting to be processed should be 0 at the start of each trigger.

On Mon, Jul 30, 2018 at 8:03 AM, Kailash Kalahasti <
kailash.kalaha...@gmail.com> wrote:

> Is there any way to find out backlog on kafka topic while using spark
> structured streaming ? I checked few consumer apis but that requires to
> enable groupid for streaming, but seems it is not allowed.
>
> Basically i want to know number of records waiting to be processed.
>
> Any suggestions ?
>


Kafka backlog - spark structured streaming

2018-07-30 Thread Kailash Kalahasti
Is there any way to find out backlog on kafka topic while using spark
structured streaming ? I checked few consumer apis but that requires to
enable groupid for streaming, but seems it is not allowed.

Basically i want to know number of records waiting to be processed.

Any suggestions ?


Re: How to Create one DB connection per executor and close it after the job is done?

2018-07-30 Thread Patrick McGloin
You could use an object in Scala, of which only one instance will be
created on each JVM / Executor. E.g.

object MyDatabseSingleton {
var dbConn = ???
}

On Sat, 28 Jul 2018, 08:34 kant kodali,  wrote:

> Hi All,
>
> I understand creating a connection forEachPartition but I am wondering can
> I create one DB connection per executor and close it after the job is done?
> any sample code would help. you can imagine I am running a simple batch
> processing application.
>
> Thanks!
>


Re: How to reduceByKeyAndWindow in Structured Streaming?

2018-07-30 Thread oripwk
Thanks guys, it really helps.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



Using Spark Streaming for analyzing changing data

2018-07-30 Thread oripwk


We have a use case where there's a stream of events while every event has an
ID and its current state with a timestamp:

…
111,ready,1532949947
111,offline,1532949955
111,ongoing,1532949955
111,offline,1532949973
333,offline,1532949981
333,ongoing,1532949987
…

We want to ask questions about the current state of the *whole dataset*,
from the beginning of time, such as:
  "how many items are now in ongoing state"

(but bear in mind that there are more complicated questions, and all of them
are asking about the _current_ state of the dataset, from the beginning of
time)

I haven't found any simple, performant way of doing it.

The ways I've found are:
1. Using mapGroupsWithState, where I groupByKey on the ID, and update the
state always for the latest event by timestamp
2. Using groupByKey on the ID, and leaving only the matched event whose
timestamp is the latest

Both methods are not good because the first one involves state which means
checkpointing, memory, etc., and the second involves shuffling and sorting.

We will have a lot of such queries in order to populate a real-time
dashboard.

I wonder, as a general question, what is the correct way to process this
type of data in Spark Streaming?




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



How to add a new source to exsting struct streaming application, like a kafka source

2018-07-30 Thread 杨浩
How to add a new source to exsting struct streaming application, like a
kafka source


How to read csv in dataframe

2018-07-30 Thread Lehak Dharmani
I am trying to read csv in spark dataframe . My Os = Ubuntu 18.04,
spark-version 2.3.1, python -version 2.7.15
My code :
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark import SQLContext
from pyspark.sql import SparkSession
conf = SparkConf()
sc = SparkContext(conf = conf)
spark =
SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate()
sqlContext =SQLContext(sc)
df=spark.read.csv('/home/iconnect4/finrecon/test2.csv')
df.show()

Error :
Traceback (most recent call last):
  File "/home/iconnect4/finrecon/scratch_3.py", line 43, in 
df=sqlContext.read.csv('/home/iconnect4/finrecon/test2.csv')
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py",
line 441, in csv
  File
"/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line
1257, in __call__
  File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line
79, in deco
pyspark.sql.utils.IllegalArgumentException: 'Illegal pattern component: XXX'

Please help





--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org