Re: Spark AsyncEventQueue doubt

2018-05-28 Thread Yuanjian Li
Hi Askash

The event dropping problem also triggered by slow listener or large 
number of events or both, the easy and simple way is change the config of 
`spark.scheduler.listenerbus.eventqueue.capacity`, its default value is 1. 
But if after change the queue capacity to a more lager value, it still 
happened, you should check more about listener.


> 在 2018年5月27日,15:45,Aakash Basu  写道:
> 
> Hi,
> 
> I'm getting the below ERROR and WARN when running a little heavy calculation 
> on a dataset -
> 
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 2018-05-27 12:51:11 ERROR AsyncEventQueue:70 - Dropping event from queue 
> appStatus. This likely means one of the listeners is too slow and cannot keep 
> up with the rate at which tasks are being started by the scheduler.
> 2018-05-27 12:51:11 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Thu Jan 01 
> 05:30:00 IST 1970.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:52:14 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:51:11 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:53:14 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:52:14 IST 2018.
> 2018-05-27 12:54:14 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:53:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:55:14 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:54:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:56:15 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:55:14 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:57:32 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:56:15 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:58:32 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:57:32 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 12:59:33 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:58:32 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 13:00:34 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 12:59:33 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 13:01:35 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 13:00:34 IST 2018.
> [Stage 7784:>(9 + 3) / 34][Stage 7786:(53 + 38) / 200][Stage 7789:(74 + 53) / 
> 200]2018-05-27 13:02:36 WARN  AsyncEventQueue:66 - Dropped 
> com.codahale.metrics.Counter@1d8423d1 events from appStatus since Sun May 27 
> 13:01:35 IST 2018.
> 
> Even though my job is not failing but why am I getting these?
> 
> Thanks,
> Aakash.



Pandas UDF for PySpark error. Big Dataset

2018-05-28 Thread Traku traku
Hi.

I'm trying to use the new feature but I can't use it with a big dataset
(about 5 million rows).

I tried  increasing executor memory, driver memory, partition number, but
any solution can help me to solve the problem.

One of the executor task increase the shufle memory until fails.

Error is arrow generated: unable to expand the buffer.

Any idea?


trying to understand structured streaming aggregation with watermark and append outputmode

2018-05-28 Thread Koert Kuipers
hello all,
just playing with structured streaming aggregations for the first time.
this is my little program i run inside sbt:

import org.apache.spark.sql.functions._

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", )
  .load()

val query = lines
  .withColumn("time", current_timestamp)
  .withWatermark("time", "1 second")
  .groupBy(window($"time", "1 second")).agg(collect_list("value") as
"value")
  .withColumn("windowstring", $"window" as "string")
  .writeStream
  .format("console")
  .outputMode(OutputMode.Append)
  .start()

query.awaitTermination()

before i start it i create a little server with nc:
$ nc -lk 

after it starts i simply type in a single character every 20 seconds or so
inside nc and hit enter. my characters are 1, 2, 3, etc.

the thing i dont understand is it comes back with the correct responses,
but with delays in terms of entries (not time). after the first 2
characters it comes back with empty aggregations, and then for every next
character it comes back with the response for 2 characters ago. so when i
hit 3 it comes back with the response for 1.

not very realtime :(

any idea why?

i would like it to respond to my input 1 with the relevant response
for that input (after the window and watermark has expired, of course, so
within 2 seconds).

i tried adding a trigger of 1 second but that didnt help either.

below is the output with my inputs inserted using '<= ', so '<= 1'
means i hit 1 and then enter.


<= 1
---
Batch: 0
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 2
---
Batch: 1
---
+--+-++
|window|value|windowstring|
+--+-++
+--+-++

<= 3
Batch: 2
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [1]|[2018-05-28 18:00...|
++-++

<= 4
---
Batch: 3
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:00...|  [2]|[2018-05-28 18:00...|
++-++

<= 5
---
Batch: 4
---
++-++
|  window|value|windowstring|
++-++
|[2018-05-28 18:01...|  [3]|[2018-05-28 18:01...|
++-++


Re: [Spark2.1] SparkStreaming to Cassandra performance problem

2018-05-28 Thread Saulo Sobreiro
Hi,
I run a few more tests and found that even with a lot more operations on the 
scala side, python is outperformed...

Dataset Stream duration: ~3 minutes (csv formatted data messages read from 
Kafka)
Scala process/store time: ~3 minutes (map with split + metrics calculations + 
store raw + strore metrics )
Python process/store time: ~7 minutes (map with split + store raw )

This is the difference between being usable in production or not. I get that 
python is likely to be slower because of that Python - Java object 
transformations, but I was not expecting such a huge difference.

This results are very interesting as I was comparing to the time that an 
"equivalent" application in storm takes to process the exact same stream (~3 
minutes as well) for the same results and spark was clearly losing the race.

Thank you all for your feedback :)

Regards,
Saulo

On 21/05/2018 14:09:40, Russell Spitzer  wrote:

The answer is most likely that when you use Cross Java - Python code you incur 
a penalty for every objects that you transform from a Java object into a Python 
object (and then back again to a Python object) when data is being passed in 
and out of your functions. A way around this would probably be to have used the 
Dataframe API if possible, which would have compiled the interactions in Java 
and skipped python-java serialization. Using Scala from the start thought is a 
great idea. I would also probably remove the cache from your stream since that 
probably is only hurting (adding an additional serialization which is only used 
once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman 
mailto:alons...@gmail.com>> wrote:
The main language they developed spark with is scala, so all the new features 
go first to scala, java and finally python. I'm not surprised by the results, 
we've seen it on Stratio since the first versions of spark. At the beginning of 
development, some of our engineers make the prototype with python, but when it 
comes down to it, if it goes into production, it has to be rewritten in scala 
or java, usually scala.



El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro 
(mailto:saulo.sobre...@outlook.pt>>) escribió:
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this 
use case in yarn-client mode. I managed to run this in standalone (local 
master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I 
decided to dig a little bit more in the implementation to see what could I 
improve. I just finished evaluating some results.
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try 
Scala. Everything was implemented in the most similar way possible and I got 
surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared 
some emails ago but to compare the languages influence in the most comparable 
way I used the following snippets:

# Scala implementation --

val kstream = KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent,
 ConsumerStrategies.Subscribe[String, String](topic, 
kafkaParams))
kstream
   .map( x => parse(x.value) )
   .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation 
# Adapted from the previously shared code. However instead of calculating the 
metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], 
{"metadata.broker.list": brokers})

kafkaStream \
.transform(parse) \
.foreachRDD(casssave)


For the same streaming input the scala app took an average of ~1.5 seconds to 
handle each event. For the python implementation, the app took an average of 
~80 seconds to handle each event (and after a lot of pickle concurrency access 
issues).

Note that I considered the time as the difference between the event generation 
(before being published to Kafka) and the moment just before the 
saveToCassandra.

The problem in the python implementation seems to be due to the delay 
introduced by the foreachRDD(casssave) call, which only runs 
rdd.saveToCassandra( "test_hdpkns", "measurement" ).


Honestly I was not expecting such a difference between these 2 codes... Can you 
understand why is this happening ?



Again, Thank you very much for your help,

Best Regards


Sharing my current Scala code below
# Scala Snippet =
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
//...
val kstream = KafkaUtils.createDirectStream[String, String](
 ssc,
 LocationStrategies.PreferConsistent,
 ConsumerStrategies.Subscribe[String, String](topic, 
kafkaParams))
//...
// handle Kafka 

Error on fetchin mass data from cassandra using SparkSQL

2018-05-28 Thread Soheil Pourbafrani
I tried to fetch some data from Cassandra using SparkSql. For small tables,
all things go well but trying to fetch data from big tables I got the
following error:

java.lang.NoSuchMethodError:
com.datastax.driver.core.ResultSet.fetchMoreResults()Lshade/com/datastax/spark/connector/google/common/util/concurrent/ListenableFuture;

I tried many version of spark Cassandra connector (2.0.5, 2.0.8, 2.3.0) and
even unshaded version of that but no differences!

In my project, I have Flink-cassandra-connector-1.4.2 maven dependency and
it uses Cassandra datastax core, too. I remove that dependency but no
differences!

I also read this

post
in StackOverflow but their solution didn't work for me.

My Cassandra version is 3.11 and I use Spark 2.2.1 in local mode.

How can I solve the problem?


Name error when writing data as orc

2018-05-28 Thread JF Chen
I am working on writing a dataset to orc format to hdfs, while I meet
the following problem:

Error: name expected at the position 1473 of
'string:boolean:string:string..zone:struct<$ref:string> ...' but '$'
is found.

where the position 1473 is at "$ref:string" place.




Regard,
Junfeng Chen


Re: Spark Structured Streaming is giving error “org.apache.spark.sql.AnalysisException: Inner join between two streaming DataFrames/Datasets is not supported;”

2018-05-28 Thread Jacek Laskowski
Hi,

After you leave Spark Structured Streaming right after you generate RDDs
(for your streaming queries) you can do any kind of "joins". You're again
in the old good days of RDD programming (with all the whistles and bells).

Please note that Spark Structured Streaming != Spark Streaming since the
former uses Dataset API while the latter RDD API.

Don't touch RDD API and Spark Streaming unless you know what you're doing :)

Pozdrawiam,
Jacek Laskowski

https://about.me/JacekLaskowski
Mastering Spark SQL https://bit.ly/mastering-spark-sql
Spark Structured Streaming https://bit.ly/spark-structured-streaming
Mastering Kafka Streams https://bit.ly/mastering-kafka-streams
Follow me at https://twitter.com/jaceklaskowski

On Tue, May 15, 2018 at 5:36 PM, ☼ R Nair (रविशंकर नायर) <
ravishankar.n...@gmail.com> wrote:

> Hi Jacek,
>
> If we use RDD instead of Dataframe, can we accomplish the same? I mean, is
> joining  between RDDS allowed in Spark streaming ?
>
> Best,
> Ravi
>
> On Sun, May 13, 2018 at 11:18 AM Jacek Laskowski  wrote:
>
>> Hi,
>>
>> The exception message should be self-explanatory and says that you cannot
>> join two streaming Datasets. This feature was added in 2.3 if I'm not
>> mistaken.
>>
>> Just to be sure that you work with two streaming Datasets, can you show
>> the query plan of the join query?
>>
>> Jacek
>>
>> On Sat, 12 May 2018, 16:57 ThomasThomas,  wrote:
>>
>>> Hi There,
>>>
>>> Our use case is like this.
>>>
>>> We have a nested(multiple) JSON message flowing through Kafka Queue.
>>> Read
>>> the message from Kafka using Spark Structured Streaming(SSS) and  explode
>>> the data and flatten all data into single record using DataFrame joins
>>> and
>>> land into a relational database table(DB2).
>>>
>>> But we are getting the following error when we write into db using JDBC.
>>>
>>> “org.apache.spark.sql.AnalysisException: Inner join between two
>>> streaming
>>> DataFrames/Datasets is not supported;”
>>>
>>> Any help would be greatly appreciated.
>>>
>>> Thanks,
>>> Thomas Thomas
>>> Mastermind Solutions LLC.
>>>
>>>
>>>
>>> --
>>> Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/
>>>
>>> -
>>> To unsubscribe e-mail: user-unsubscr...@spark.apache.org
>>>
>>>


Execution model in Spark

2018-05-28 Thread Esa Heikkinen
Hi

I don't know whether this question is suitable for this forum, but I take the 
risk and ask :)

In my understanding the execution model in Spark is very data (flow) stream 
oriented and specific. Is it difficult to build a control flow logic (like 
state-machine) outside of the stream specific processings ?

It is only way to combine all different type event streams to one big stream 
and then process it by some own stateful "logic" ?
And how to build this logic ?

Best Regards, Esa