Stopping StreamingContext

2018-03-29 Thread Sidney Feiner
Hey,
I have a Spark Streaming application processing some events.
Sometimes, I want to stop the application if a get a specific event. I collect 
the executor's results in the driver and based on those results, I kill the 
StreamingContext using StreamingContext.stop(stopSparkContext=true).
When I do that, I can see in the logs that the app is shutting down, closing 
receivers etc.
But when I go to the master's web UI I can still see the app under "Running 
Applications". But if I click it, it says the endpoint doesn't exist.
When I check the open processes on the machine, I can see that the job's 
process is still running.
Am I closing the application wrong?

Those are the logs once I call the stop() method:

2018-03-28 11:59:04 INFO  KafkaProducer:615 - Closing the Kafka producer with 
timeoutMillis = 9223372036854775807 ms.
2018-03-28 11:59:04 INFO  ReceiverTracker:54 - Sent stop signal to all 1 
receivers
2018-03-28 11:59:05 INFO  BlockManagerInfo:54 - Added input-0-1522238344750 in 
memory on i-va-spark1:59059 (size: 1632.0 B, free: 579.2 MB)
2018-03-28 11:59:05 ERROR ReceiverTracker:70 - Deregistered receiver for stream 
0: Stopped by driver
2018-03-28 11:59:05 INFO  BlockManagerInfo:54 - Added input-0-1522238345000 in 
memory on i-va-spark1:59059 (size: 272.0 B, free: 579.2 MB)
2018-03-28 11:59:05 INFO  TaskSetManager:54 - Finished task 0.0 in stage 2.0 
(TID 70) in 30213 ms on i-va-spark1 (executor 0) (1/1)
2018-03-28 11:59:05 INFO  TaskSchedulerImpl:54 - Removed TaskSet 2.0, whose 
tasks have all completed, from pool
2018-03-28 11:59:05 INFO  DAGScheduler:54 - ResultStage 2 (start at 
UserLocationHistoryJob.scala:38) finished in 30.213 s
2018-03-28 11:59:05 INFO  ReceiverTracker:54 - All of the receivers have 
deregistered successfully
2018-03-28 11:59:05 INFO  ReceiverTracker:54 - ReceiverTracker stopped
2018-03-28 11:59:05 INFO  JobGenerator:54 - Stopping JobGenerator immediately
2018-03-28 11:59:05 INFO  RecurringTimer:54 - Stopped timer for JobGenerator 
after time 152223834
2018-03-28 11:59:05 INFO  JobGenerator:54 - Stopped JobGenerator
2018-03-28 11:59:07 INFO  JobScheduler:54 - Stopped JobScheduler
2018-03-28 11:59:07 INFO  StreamingContext:54 - StreamingContext stopped 
successfully
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_5_piece0 on 
10.0.0.243:41976 in memory (size: 2.4 KB, free: 488.4 MB)
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_5_piece0 on 
i-va-spark1:59059 in memory (size: 2.4 KB, free: 579.2 MB)
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_4_piece0 on 
10.0.0.243:41976 in memory (size: 23.9 KB, free: 488.4 MB)
2018-03-28 11:59:07 INFO  BlockManagerInfo:54 - Removed broadcast_4_piece0 on 
i-va-spark1:59059 in memory (size: 23.9 KB, free: 579.2 MB)
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-171-selector-ServerConnectorManager@478b3e9/2,5,main]
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-172-selector-ServerConnectorManager@478b3e9/3,5,main]
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-169-selector-ServerConnectorManager@478b3e9/0,5,main]
2018-03-28 11:59:37 WARN  QueuedThreadPool:178 - 
SparkUI{STOPPING,8<=9<=200,i=0,q=4} Couldn't stop 
Thread[SparkUI-170-selector-ServerConnectorManager@478b3e9/1,5,main]
2018-03-28 13:22:01 INFO  DiskBlockManager:54 - Shutdown hook called
2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Shutdown hook called
2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Deleting directory 
/data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581
2018-03-28 13:22:01 INFO  ShutdownHookManager:54 - Deleting directory 
/data/spark/scratch/spark-69a3a8a6-5504-4153-a4c1-059676861581/userFiles-8a970eec-da41-442b-9ccf-1621b9e9e045



Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




RE: [PySpark] - running processes and computing time

2017-07-04 Thread Sidney Feiner
To initialize it per executor, I used a class with only class attibutes and 
class methods (like an `object` in Scala), but because  I was using PySpark, it 
was actually being initiated per process ☹
What I went for was the broadcast variable but there still is something 
suspicious with my application – the processing time of each batch.

In my logs, I see that when I process a partition, it takes under a second. But 
in the Spark UI I see that a task takes between 3 and 6 seconds.
Shouldn't the partition process time and the task computing time be the same?

My code:


def process_func (obj, records):
start = time()
processed_records = # Some processing
logger.info("It took {0} seconds to handle records".format(time() - start, 
events_amount))  # This logs very small numbers (around 0.05 seoonds)
return analyzed_events

def handle_rdd(rdd: RDD):
start_time = time.time()
rdd.foreachPartition(lambda records: process_func(object_broadcast.value, 
records))
logger.info("Handle RDD took: {0} seconds".format(time.time() - 
start_time))  # This logs much bigger numbers (around 3-6 seconds)

ssc.union(*streams)\
.filter(lambda x: x[1] is not None)\
.map(lambda x: x[1])\
.foreachRDD(handle_rdd)  # Keep only values and cast them to TextAnalysis
ssc.start()
ssc.awaitTermination()


each RDD has at most 10 partitions which means that it should take around 0.5 
seconds for all the tasks to be processed.

Does anyone know what happens here? The time difference is too big for it to be 
networking right?

From: Sudev A C [mailto:sudev...@go-mmt.com]
Sent: Monday, July 3, 2017 7:48 PM
To: Sidney Feiner <sidney.fei...@startapp.com>; user@spark.apache.org
Subject: Re: [PySpark] - running processes

You might want to do the initialisation per partition (Not sure how you are 
achieving the per executor initialisation in your code ).

To initialise something for per partition, you may use something like 
rdd.forEach partition.

Or if you want something globally like a variable for further processing you 
might want to initialise it once as a broadcast variable and use access the 
data structure through broadcast variable.

Afaik python process will be initiated for per partition tasks.
On Mon, 3 Jul 2017 at 5:23 PM, Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:

In my Spark Streaming application, I have the need to build a graph from a file 
and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than 
once per executor, every time with a different process id (every process has 
it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only 
relevant when I develop in JVM languages like Scala/Java? Do executors in 
PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be 
initiated once?
Thanks :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




::DISCLAIMER::





This message is intended only for the use of the addressee and may contain 
information that is privileged, confidential and exempt from disclosure under 
applicable law. If the reader of this message is not the intended recipient, or 
the employee or agent responsible for delivering the message to the intended 
recipient, you are hereby notified that any dissemination, distribution or 
copying of this communication is strictly prohibited. If you have received this 
e-mail in error, please notify us immediately by return e-mail and delete this 
e-mail and all attachments from your system.


[PySpark] - running processes

2017-07-03 Thread Sidney Feiner
In my Spark Streaming application, I have the need to build a graph from a file 
and initializing that graph takes between 5 and 10 seconds.

So I tried initializing it once per executor so it'll be initialized only once.

After running the application, I've noticed that it's initiated much more than 
once per executor, every time with a different process id (every process has 
it's own logger).

Doesn't every executor have it's own JVM and it's own process? Or is that only 
relevant when I develop in JVM languages like Scala/Java? Do executors in 
PySpark spawn new processes for new tasks?

And if they do, how can I make sure that my graph object will really only be 
initiated once?
Thanks :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




RE: Message getting lost in Kafka + Spark Streaming

2017-05-31 Thread Sidney Feiner
Are you sure that every message gets processed? It could be that some messages 
failed passing the decoder.
And during the processing, are you maybe putting the events into a map? That 
way, events with the same key could override each other and that way you'll 
have less final events.

-Original Message-
From: Vikash Pareek [mailto:vikash.par...@infoobjects.com] 
Sent: Tuesday, May 30, 2017 4:00 PM
To: user@spark.apache.org
Subject: Message getting lost in Kafka + Spark Streaming

I am facing an issue related to spark streaming with kafka, my use case is as
follow:
1. Spark streaming(DirectStream) application reading data/messages from kafka 
topic and process it 2. On the basis of proccessed message, app will write 
proccessed message to different kafka topics for e.g. if messgese is harmonized 
then write to harmonized topic else unharmonized topic
 
the problem is that during the streaming somehow we are lossing some messaged 
i.e all the incoming messages are not written to harmonized or unharmonized 
topics.
for e.g. if app received 30 messages in one batch then sometime it write all 
the messges to output topics(this is expected behaviour) but sometimes it 
writes only 27 (3 messages are lost, this number can change).
 
Versions as follow:
Spark 1.6.0
Kafka 0.9
 
Kafka topics confguration is as follow:
# of brokers: 3
# replicxation factor: 3
# of paritions: 3
 
Following are the properties we are using for kafka:
*  val props = new Properties()
  props.put("metadata.broker.list",
properties.getProperty("metadataBrokerList"))
  props.put("auto.offset.reset",
properties.getProperty("autoOffsetReset"))
  props.put("group.id", properties.getProperty("group.id"))
  props.put("serializer.class", "kafka.serializer.StringEncoder")
  props.put("outTopicHarmonized",
properties.getProperty("outletKafkaTopicHarmonized"))
  props.put("outTopicUnharmonized",
properties.getProperty("outletKafkaTopicUnharmonized"))
  props.put("acks", "all");
  props.put("retries", "5");
  props.put("request.required.acks", "-1")
*
Following is the piece of code where we are writing proccessed messges to
kafka:
*  val schemaRdd2 = finalHarmonizedDF.toJSON
 
  schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
 
partition.foreach { row =>
  if (debug) println(row.mkString)
  val keyedMessage = new KeyedMessage[String, 
String](props.getProperty("outTopicHarmonized"),
null, row.toString())
  producer.send(keyedMessage)
 
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
  }
*
We explicitely added sleep(1000) for testing purpose.
But this is also not solving the problem :(
 
Any suggestion would be appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Message-getting-lost-in-Kafka-Spark-Streaming-tp28719.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org


-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



RE: [Spark Streaming] - Killing application from within code

2017-05-04 Thread Sidney Feiner
Instead of setting up an additional mechanism, would it be "clean" to catch the 
error back in the driver, and use SparkContext.stop() there?
And beause the SparkContext can’t be serialized, I can't catch the error inside 
the rdd.foreach function.

What I did eventually and it worked:

ssc.union(uniStreams) foreachRDD { rdd =>
  val futures = rdd mapValues { event =>
handleEvent(event)
  } collect() map(_._2)
  Future.sequence(futures.toList) onFailure {
case ex: Throwable =>
  LoggerManager.getInstance().getLogger.error(s"Unhandled Error caught in 
job, stopping SparkContext. Error: ${ExceptionUtils.getStackTrace(ex)}")
  sc.stop()
  }
}

It collects all the futures into the driver and checks if one of them failed.

If it's not a recommended way of doing it, I'm all ears ☺


From: Tathagata Das [mailto:tathagata.das1...@gmail.com]
Sent: Wednesday, May 3, 2017 10:25 PM
To: Sidney Feiner <sidney.fei...@startapp.com>
Cc: user@spark.apache.org
Subject: Re: [Spark Streaming] - Killing application from within code

There isnt a clean programmatic way to kill the application running in the 
driver from the executor. You will have to set up addition RPC mechanism to 
explicitly send a signal from the executors to the application/driver to quit.

On Wed, May 3, 2017 at 8:44 AM, Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:
Hey, I'm using connections to Elasticsearch from within my Spark Streaming 
application.
I'm using Futures to maximize performance when it sends network requests to the 
ES cluster.
Basically, I want my app to crash if any one of the executors fails to connect 
to ES.

The exception gets catched and returned in my Future as a Failure(ex: 
NoNodeAvailableException) but when I handle it, I can't seem to kill my app.
I tried using:

fut andThen {
  case Failure(ex: NoNodeAvailableException) =>
throw ex
}

fut andThen {
  case Failure(ex: NoNodeAvailableException) =>
System.exit(-1)
}

fut onFailure {
  case ex: NoNodeAvailableException =>
throw ex
}

fut onFailure {
  case ex: NoNodeAvailableException =>
System.exit(-1)
}


But none of them seem to be killing my app. The System.exit(-1) kills my 
executor but that doesn't seem like the correct way to do it.
And no matter what way I try, the driver stays alive.

Is there a way to programmatically kill the application from within one of the 
workers?

Thanks a lot ☺


Sidney Feiner / SW Developer
M: +972.528197720<tel:+972%2052-819-7720> / Skype: sidney.feiner.startapp

[emailsignature]





[Spark Streaming] - Killing application from within code

2017-05-03 Thread Sidney Feiner
Hey, I'm using connections to Elasticsearch from within my Spark Streaming 
application.
I'm using Futures to maximize performance when it sends network requests to the 
ES cluster.
Basically, I want my app to crash if any one of the executors fails to connect 
to ES.

The exception gets catched and returned in my Future as a Failure(ex: 
NoNodeAvailableException) but when I handle it, I can't seem to kill my app.
I tried using:

fut andThen {
  case Failure(ex: NoNodeAvailableException) =>
throw ex
}

fut andThen {
  case Failure(ex: NoNodeAvailableException) =>
System.exit(-1)
}

fut onFailure {
  case ex: NoNodeAvailableException =>
throw ex
}

fut onFailure {
  case ex: NoNodeAvailableException =>
System.exit(-1)
}


But none of them seem to be killing my app. The System.exit(-1) kills my 
executor but that doesn't seem like the correct way to do it.
And no matter what way I try, the driver stays alive.

Is there a way to programmatically kill the application from within one of the 
workers?

Thanks a lot :)


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]




RE: How to run a spark on Pycharm

2017-03-03 Thread Sidney Feiner
Hey,
I once found an article about that:
https://mengdong.github.io/2016/08/08/fully-armed-pyspark-with-ipython-and-jupyter/

And I once managed to set it up on Pycharm as well. What I had to do was to add 
/path/to/spark to a system variable called "PYTHTONPATH".
Try that one, it might help ☺

From: Anahita Talebi [mailto:anahita.t.am...@gmail.com]
Sent: Friday, March 3, 2017 5:05 PM
To: Pushkar.Gujar 
Cc: User 
Subject: Re: How to run a spark on Pycharm

Hi,
Thanks for your answer.
Sorry, I am completely beginner in running the code in spark.
Could you please tell me a bit more in details how to do that?
I installed ipython and Jupyter notebook on my local machine. But how can I run 
the code using them? Before, I tried to run the code with Pycharm that I was 
failed.
Thanks,
Anahita

On Fri, Mar 3, 2017 at 3:48 PM, Pushkar.Gujar 
> wrote:
Jupyter notebook/ipython can be connected to apache spark


Thank you,
Pushkar Gujar


On Fri, Mar 3, 2017 at 9:43 AM, Anahita Talebi 
> wrote:
Hi everyone,
I am trying to run a spark code on Pycharm. I tried to give the path of spark 
as a environment variable to the configuration of Pycharm. Unfortunately, I get 
the error. Does anyone know how I can run the spark code on Pycharm?
It shouldn't be necessarily on Pycharm. if you know any other software, It 
would be nice to tell me.
Thanks a lot,
Anahita





RE: pyspark in intellij

2017-02-25 Thread Sidney Feiner
Yes, I got it working once but I can't exactly remember how.
I think what I did was the following:

· To the environment variables, add a variable named PYTHONPATH with 
the path to your pyspark python directory (in my case, 
C:\spark-2.1.0-bin-hadoop2.7\python)

· To the environment variable, add the same path as above to the PATH 
variable

Hope these work ☺


Sidney Feiner / SW Developer
M: +972.528197720 / Skype: sidney.feiner.startapp

[emailsignature]

From: Stephen Boesch [mailto:java...@gmail.com]
Sent: Sunday, February 26, 2017 3:56 AM
To: user <user@spark.apache.org>
Subject: pyspark in intellij

Anyone have this working - either in 1.X or 2.X?

thanks


RE: How to query a query with not contain, not start_with, not end_with condition effective?

2017-02-21 Thread Sidney Feiner
Chanh wants to return user_id's that don't have any record with a url 
containing "sell". Without a subquery/join, it can only filter per record 
without knowing about the rest of the user_id's record

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Yong Zhang [mailto:java8...@hotmail.com]
Sent: Tuesday, February 21, 2017 4:10 PM
To: Chanh Le <giaosu...@gmail.com>; user @spark <user@spark.apache.org>
Subject: Re: How to query a query with not contain, not start_with, not 
end_with condition effective?


Not sure if I misunderstand your question, but what's wrong doing it this way?


scala> spark.version
res6: String = 2.0.2
scala> val df = Seq((1,"lao.com/sell"), (2, "lao.com/buy")).toDF("user_id", 
"url")
df: org.apache.spark.sql.DataFrame = [user_id: int, url: string]

scala> df.registerTempTable("data")
warning: there was one deprecation warning; re-run with -deprecation for details

scala> spark.sql("select user_id from data where url not like '%sell%'").show
+---+
|user_id|
+---+
|  2|
+---+


Yong


From: Chanh Le <giaosu...@gmail.com<mailto:giaosu...@gmail.com>>
Sent: Tuesday, February 21, 2017 4:56 AM
To: user @spark
Subject: How to query a query with not contain, not start_with, not end_with 
condition effective?

Hi everyone,

I am working on a dataset like this
user_id url
1  lao.com/buy<http://lao.com/buy>
2  bao.com/sell<http://bao.com/sell>
2  cao.com/market<http://cao.com/market>
1   lao.com/sell<http://lao.com/sell>
3  vui.com/sell<http://vui.com/sell>

I have to find all user_id with url not contain sell. Which means I need to 
query all user_id contains sell and put it into a set then do another query to 
find all user_id not in that set.
SELECT user_id
FROM data
WHERE user_id not in ( SELECT user_id FROM data WHERE url like '%sell%';

My data is about 20 million records and it's growing. When I tried in zeppelin 
I need to set spark.sql.crossJoin.enabled = true
Then I ran the query and the driver got extremely high CPU percentage and the 
process get stuck and I need to kill it.
I am running at client mode that submit to a Mesos cluster.

I am using Spark 2.0.2 and my data store in HDFS with parquet format.

Any advices for me in this situation?

Thank you in advance!.

Regards,
Chanh


RE: Jars directory in Spark 2.0

2017-02-01 Thread Sidney Feiner
Ok, good to know ☺
Shading every spark app it is then…
Thanks!

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Marcelo Vanzin [mailto:van...@cloudera.com]
Sent: Wednesday, February 1, 2017 7:41 PM
To: Sidney Feiner <sidney.fei...@startapp.com>
Cc: Koert Kuipers <ko...@tresata.com>; user@spark.apache.org
Subject: Re: Jars directory in Spark 2.0

Spark has never shaded dependencies (in the sense of renaming the classes), 
with a couple of exceptions (Guava and Jetty). So that behavior is nothing new. 
Spark's dependencies themselves have a lot of other dependencies, so doing that 
would have limited benefits anyway.

On Tue, Jan 31, 2017 at 11:23 PM, Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:
Is this done on purpose? Because it really makes it hard to deploy 
applications. Is there a reason they didn't shade the jars they use to begin 
with?

Sidney Feiner   /  SW Developer
M: +972.528197720<tel:+972%2052-819-7720>  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Koert Kuipers [mailto:ko...@tresata.com<mailto:ko...@tresata.com>]
Sent: Tuesday, January 31, 2017 7:26 PM
To: Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>>
Cc: user@spark.apache.org<mailto:user@spark.apache.org>
Subject: Re: Jars directory in Spark 2.0

you basically have to keep your versions of dependencies in line with sparks or 
shade your own dependencies.

you cannot just replace the jars in sparks jars folder. if you wan to update 
them you have to build spark yourself with updated dependencies and confirm it 
compiles, passes tests etc.

On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:
Hey,
While migrating to Spark 2.X from 1.6, I've had many issues with jars that come 
preloaded with Spark in the "jars/" directory and I had to shade most of my 
packages.
Can I replace the jars in this folder to more up to date versions? Are those 
jar used for anything internal in Spark which means I can't blindly replace 
them?

Thanks ☺


Sidney Feiner   /  SW Developer
M: +972.528197720<tel:+972%2052-819-7720>  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

 <http://www.startapp.com/press/#events_press>



--
Marcelo


RE: Jars directory in Spark 2.0

2017-01-31 Thread Sidney Feiner
Is this done on purpose? Because it really makes it hard to deploy 
applications. Is there a reason they didn't shade the jars they use to begin 
with?

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Koert Kuipers [mailto:ko...@tresata.com]
Sent: Tuesday, January 31, 2017 7:26 PM
To: Sidney Feiner <sidney.fei...@startapp.com>
Cc: user@spark.apache.org
Subject: Re: Jars directory in Spark 2.0

you basically have to keep your versions of dependencies in line with sparks or 
shade your own dependencies.

you cannot just replace the jars in sparks jars folder. if you wan to update 
them you have to build spark yourself with updated dependencies and confirm it 
compiles, passes tests etc.

On Tue, Jan 31, 2017 at 3:40 AM, Sidney Feiner 
<sidney.fei...@startapp.com<mailto:sidney.fei...@startapp.com>> wrote:
Hey,
While migrating to Spark 2.X from 1.6, I've had many issues with jars that come 
preloaded with Spark in the "jars/" directory and I had to shade most of my 
packages.
Can I replace the jars in this folder to more up to date versions? Are those 
jar used for anything internal in Spark which means I can't blindly replace 
them?

Thanks ☺


Sidney Feiner   /  SW Developer
M: +972.528197720<tel:+972%2052-819-7720>  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

<http://www.startapp.com/press/#events_press>
 <http://www.startapp.com/press/#events_press>


Jars directory in Spark 2.0

2017-01-31 Thread Sidney Feiner
Hey,
While migrating to Spark 2.X from 1.6, I've had many issues with jars that come 
preloaded with Spark in the "jars/" directory and I had to shade most of my 
packages.
Can I replace the jars in this folder to more up to date versions? Are those 
jar used for anything internal in Spark which means I can't blindly replace 
them?

Thanks :)


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

[Meet Us at] <http://www.startapp.com/press/#events_press>


RE: [PySpark 2.1.0] - SparkContext not properly initialized by SparkConf

2017-01-26 Thread Sidney Feiner
I think I'm getting close to find the reason:

When I initialize the SparkContext, the following code is executed:
def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, 
serializer,
 conf, jsc, profiler_cls):
self.environment = environment or {}
# java gateway must have been launched at this point.
if conf is not None and conf._jconf is not None:
# conf has been initialized in JVM properly, so use conf directly. This 
represent the
# scenario that JVM has been launched before SparkConf is created (e.g. 
SparkContext is
# created and then stopped, and we create a new SparkConf and new 
SparkContext again)
   self._conf = conf
else:
self._conf = SparkConf(_jvm=SparkContext._jvm)

So I can see that the only way that my SparkConf will be used is if it also has 
a _jvm object.
I've used spark-submit to submit my job and printed the _jvm object but it is 
null, which explains why my SparkConf object is ignored.
I've tried running exactly the same on Spark 2.0.1 and it worked! My SparkConf 
object had a valid _jvm object.
Anybody knows what changed? Or if I got something wrong?

Thanks :)

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Sidney Feiner
Sent: Thursday, January 26, 2017 9:26 AM
To: user@spark.apache.org
Subject: [PySpark 2.1.0] - SparkContext not properly initialized by SparkConf

Hey, I'm pasting a question I asked on Stack Overflow without getting any 
answers(:()
I hope somebody here knows the answer, thanks in advance!
Link to 
post<https://stackoverflow.com/questions/41847113/pyspark-2-1-0-sparkcontext-not-properly-initialized-by-sparkconf>
I'm migrating from Spark 1.6 to 2.1.0 and I've run into a problem migrating my 
PySpark application.
I'm dynamically setting up my SparkConf object based on configurations in a 
file and when I was on Spark 1.6, the app would run with the correct configs. 
But now, when I open the Spark UI, I can see that NONE of those configs are 
loaded into the SparkContext. Here's my code:
spark_conf = SparkConf().setAll(
filter(lambda x: x[0].startswith('spark.'), conf_dict.items())
)
sc = SparkContext(conf=spark_conf)
I've also added a print before initializing the SparkContext to make sure the 
SparkConf has all the relevant configs:
[print("{0}: {1}".format(key, value)) for (key, value) in spark_conf.getAll()]
And this outputs all the configs I need:
* spark.app.name: MyApp
* spark.akka.threads: 4
* spark.driver.memory: 2G
* spark.streaming.receiver.maxRate: 25
* spark.streaming.backpressure.enabled: true
* spark.executor.logs.rolling.maxRetainedFiles: 7
* spark.executor.memory: 3G
* spark.cores.max: 24
* spark.executor.cores: 4
* spark.streaming.blockInterval: 350ms
* spark.memory.storageFraction: 0.2
* spark.memory.useLegacyMode: false
* spark.memory.fraction: 0.8
* spark.executor.logs.rolling.time.interval: daily
I submit my job with the following:
/usr/local/spark/bin/spark-submit --conf spark.driver.host=i-${HOSTNAME} 
--master spark://i-${HOSTNAME}:7077 /path/to/main/file.py /path/to/config/file
Does anybody know why my SparkContext doesn't get initialized with my SparkConf?
Thanks :)


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>



[PySpark 2.1.0] - SparkContext not properly initialized by SparkConf

2017-01-25 Thread Sidney Feiner
Hey, I'm pasting a question I asked on Stack Overflow without getting any 
answers(:()
I hope somebody here knows the answer, thanks in advance!
Link to 
post<https://stackoverflow.com/questions/41847113/pyspark-2-1-0-sparkcontext-not-properly-initialized-by-sparkconf>
I'm migrating from Spark 1.6 to 2.1.0 and I've run into a problem migrating my 
PySpark application.
I'm dynamically setting up my SparkConf object based on configurations in a 
file and when I was on Spark 1.6, the app would run with the correct configs. 
But now, when I open the Spark UI, I can see that NONE of those configs are 
loaded into the SparkContext. Here's my code:
spark_conf = SparkConf().setAll(
filter(lambda x: x[0].startswith('spark.'), conf_dict.items())
)
sc = SparkContext(conf=spark_conf)
I've also added a print before initializing the SparkContext to make sure the 
SparkConf has all the relevant configs:
[print("{0}: {1}".format(key, value)) for (key, value) in spark_conf.getAll()]
And this outputs all the configs I need:
* spark.app.name: MyApp
* spark.akka.threads: 4
* spark.driver.memory: 2G
* spark.streaming.receiver.maxRate: 25
* spark.streaming.backpressure.enabled: true
* spark.executor.logs.rolling.maxRetainedFiles: 7
* spark.executor.memory: 3G
* spark.cores.max: 24
* spark.executor.cores: 4
* spark.streaming.blockInterval: 350ms
* spark.memory.storageFraction: 0.2
* spark.memory.useLegacyMode: false
* spark.memory.fraction: 0.8
* spark.executor.logs.rolling.time.interval: daily
I submit my job with the following:
/usr/local/spark/bin/spark-submit --conf spark.driver.host=i-${HOSTNAME} 
--master spark://i-${HOSTNAME}:7077 /path/to/main/file.py /path/to/config/file
Does anybody know why my SparkContext doesn't get initialized with my SparkConf?
Thanks :)


Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

[Meet Us at] <http://www.startapp.com/press/#events_press>


Re: Spark-submit: where do --files go?

2017-01-19 Thread Sidney Feiner

Every executor creates a directory with your submitted files and you can access 
every file's absolute path them with the following:

val fullFilePath = SparkFiles.get(fileName)

On Jan 19, 2017 19:35, jeff saremi  wrote:

I'd like to know how -- From within Java/spark -- I can access the dependent 
files which i deploy using "--files" option on the command line?



RE: [PySpark - 1.6] - Avoid object serialization

2017-01-01 Thread Sidney Feiner
Thanks everybody but I've found another way of doing it.
Because I didn't really actually need an instance of my class, I created a 
"static" class. All variables get initiated as class variables and all methods 
are class methods.
Thanks a lot anyways, hope my answer will also help one day ☺

Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>

From: Holden Karau [mailto:hol...@pigscanfly.ca]
Sent: Thursday, December 29, 2016 8:54 PM
To: Chawla,Sumit <sumitkcha...@gmail.com>; Eike von Seggern 
<eike.segg...@sevenval.com>
Cc: Sidney Feiner <sidney.fei...@startapp.com>; user@spark.apache.org
Subject: Re: [PySpark - 1.6] - Avoid object serialization

Alternatively, using the broadcast functionality can also help with this.

On Thu, Dec 29, 2016 at 3:05 AM Eike von Seggern 
<eike.segg...@sevenval.com<mailto:eike.segg...@sevenval.com>> wrote:
2016-12-28 20:17 GMT+01:00 Chawla,Sumit 
<sumitkcha...@gmail.com<mailto:sumitkcha...@gmail.com>>:
Would this work for you?

def processRDD(rdd):
analyzer = ShortTextAnalyzer(root_dir)
rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1]))

ssc.union(*streams).filter(lambda x: x[1] != None)
.foreachRDD(lambda rdd: processRDD(rdd))

I think, this will still send each analyzer to all executors where rdd 
partitions are stored.

Maybe you can work around this with `RDD.foreachPartition()`:

def processRDD(rdd):
def partition_func(records):
analyzer = ShortTextAnalyzer(root_dir)
for record in records:
analyzer.analyze_short_text_event(record[1])
rdd.foreachPartition(partition_func)

This will create one analyzer per partition and RDD.

Best

Eike


[PySpark - 1.6] - Avoid object serialization

2016-12-28 Thread Sidney Feiner
Hey,
I just posted this question on Stack Overflow (link 
here<http://stackoverflow.com/questions/41362314/pyspark-streaming-job-avoid-object-serialization>)
 and decided to try my luck here as well :)


I'm writing a PySpark job but I got into some performance issues. Basically, 
all it does is read events from Kafka and logs the transformations made. Thing 
is, the transformation is calculated based on an object's function, and that 
object is pretty heavy as it contains a Graph and an inner-cache which gets 
automatically updated as it processes rdd's. So when I write the following 
piece of code:

analyzer = ShortTextAnalyzer(root_dir)

logger.info("Start analyzing the documents from kafka")

ssc.union(*streams).filter(lambda x: x[1] != None).foreachRDD(lambda rdd: 
rdd.foreach(lambda record: analyzer.analyze_short_text_event(record[1])))



It serializes my analyzer which takes a lot of time because of the graph, and 
as it is copied to the executor, the cache is only relevant for that specific 
RDD.

If the job was written in Scala, I could have written an Object which would 
exist in every executor and then my object wouldn't have to be serialized each 
time.

I've read in a post (http://www.spark.tc/deserialization-in-pyspark-storage/) 
that prior to PySpark 2.0, objects are always serialized. So does that mean 
that I have no way to avoid the serialization?

I'd love to hear about a way to avoid serialization in PySpark if it exists. To 
have my object created once for each executor and then it could avoid the 
serialization process, gain time and actually have a working cache system?

Thanks in advance :)
Sidney Feiner   /  SW Developer
M: +972.528197720  /  Skype: sidney.feiner.startapp

[StartApp]<http://www.startapp.com/>