Distribution of spark 3.0.1 with Hive1.2

2020-11-10 Thread Dmitry
Hi all, I am trying to make distribution 3.0.1 with spark 3 using
./dev/make-distribution.sh --name spark3-hive12 --pip  --tgz  -Phive-1.2
-Phadoop-2.7 -Pyarn
The problem is maven can't found right profile for hive and build ends
without hive jars
++ /Users/reireirei/spark/spark/build/mvn help:evaluate
-Dexpression=project.activeProfiles -pl sql/hive -Phive-1.2 -Phadoop-2.7
-Pyarn
++ grep -v INFO
++ grep -v WARNING
++ fgrep --count 'hive'
++ echo -n
+ SPARK_HIVE=0
+ '[' spark3-hive12 == none ']'
+ echo 'Spark version is 3.0.1'
Spark version is 3.0.1

What is the correct way to build in hive 1.2  jars in spark distribution?


Spark on Kubernetes (minikube) 2.3 fails with class not found exception

2018-04-10 Thread Dmitry
Hello spent a lot of time to find what I did wrong , but not found.
I have a minikube WIndows based cluster ( Hyper V as hypervisor ) and try
to run examples against Spark 2.3. Tried several  docker images builds:
* several  builds that I build myself
* andrusha/spark-k8s:2.3.0-hadoop2.7 from docker  hub
But when I try to submit job driver log returns  class not found exception
org.apache.spark.examples.SparkPi

spark-submit --master k8s://https://ip:8443  --deploy-mode cluster  --name
spark-pi --class org.apache.spark.examples.SparkPi --conf
spark.executor.instances=1 --executor-memory 1G --conf spark.kubernete
s.container.image=andrusha/spark-k8s:2.3.0-hadoop2.7
local:///opt/spark/examples/spark-examples_2.11-2.3.0.jar

I tried to use https://github.com/apache-spark-on-k8s/spark fork and it is
works without problems, more complex examples work also.


Re: Spark on Kubernetes (minikube) 2.3 fails with class not found exception

2018-04-10 Thread Dmitry
Previous example was bad paste( I tried a lot of variants, so sorry for
wrong paste )
PS C:\WINDOWS\system32> spark-submit --master k8s://https://ip:8443
--deploy-mode cluster  --name spark-pi --class
org.apache.spark.examples.SparkPi
--conf spark.executor.instances=1 --executor-memory 1G --conf
spark.kubernete
s.container.image=andrusha/spark-k8s:2.3.0-hadoop2.7
local:///opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
Returns
Image:
andrusha/spark-k8s:2.3.0-hadoop2.7
Environment variables:
SPARK_DRIVER_MEMORY: 1g
SPARK_DRIVER_CLASS: org.apache.spark.examples.SparkPi
SPARK_DRIVER_ARGS:
SPARK_DRIVER_BIND_ADDRESS:
SPARK_MOUNTED_CLASSPATH:
/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
SPARK_JAVA_OPT_0: -Dspark.kubernetes.driver.pod.name
=spark-pi-46f48a0974d43341886076bc3c5f31c4-driver
SPARK_JAVA_OPT_1:
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-46f48a0974d43341886076bc3c5f31c4
SPARK_JAVA_OPT_2: -Dspark.app.name=spark-pi
SPARK_JAVA_OPT_3:
-Dspark.driver.host=spark-pi-46f48a0974d43341886076bc3c5f31c4-driver-svc.default.svc
SPARK_JAVA_OPT_4: -Dspark.submit.deployMode=cluster
SPARK_JAVA_OPT_5: -Dspark.driver.blockManager.port=7079
SPARK_JAVA_OPT_6: -Dspark.master=k8s://https://ip:8443
SPARK_JAVA_OPT_7:
-Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar,/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
SPARK_JAVA_OPT_8:
-Dspark.kubernetes.container.image=andrusha/spark-k8s:2.3.0-hadoop2.7
SPARK_JAVA_OPT_9: -Dspark.executor.instances=1
SPARK_JAVA_OPT_10: -Dspark.app.id=spark-16eb67d8953e418aba96c2d12deecd11
SPARK_JAVA_OPT_11: -Dspark.executor.memory=1G
SPARK_JAVA_OPT_12: -Dspark.driver.port=7078


-Dspark.driver.bindAddress=$SPARK_DRIVER_BIND_ADDRESS $SPARK_DRIVER_CLASS
$SPARK_DRIVER_ARGS)
+ exec /sbin/tini -s -- /usr/lib/jvm/java-1.8-openjdk/bin/java -
Dspark.app.id=spark-16eb67d8953e418aba96c2d12deecd11
-Dspark.executor.memory=1G -Dspark.driver.port=7078
-Dspark.driver.blockManager.port=7079 -Dspark.submit.deployMode=cluster
-Dspark.jars=/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar,/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar
-Dspark.master=k8s://https://172.20.10.12:8443
-Dspark.kubernetes.executor.podNamePrefix=spark-pi-46f48a0974d43341886076bc3c5f31c4
-Dspark.kubernetes.driver.pod.name=spark-pi-46f48a0974d43341886076bc3c5f31c4-driver
-Dspark.driver.host=spark-pi-46f48a0974d43341886076bc3c5f31c4-driver-svc.default.svc
-Dspark.app.name=spark-pi -Dspark.executor.instances=1
-Dspark.kubernetes.container.image=andrusha/spark-k8s:2.3.0-hadoop2.7 -cp
':/opt/spark/jars/*:/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar;/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar'
-Xms1g -Xmx1g -Dspark.driver.bindAddress=172.17.0.2
org.apache.spark.examples.SparkPi
Error: Could not find or load main class org.apache.spark.examples.SparkPi

Found this stackoverflow question
https://stackoverflow.com/questions/49331570/spark-2-3-minikube-kubernetes-windows-demo-sparkpi-not-found
but there is no answer.
I also checked container file system, it contains
/opt/spark/examples/jars/spark-examples_2.11-2.3.0.jar



2018-04-11 1:17 GMT+08:00 Yinan Li :

> The example jar path should be local:///opt/spark/examples/*jars*
> /spark-examples_2.11-2.3.0.jar.
>
> On Tue, Apr 10, 2018 at 1:34 AM, Dmitry  wrote:
>
>> Hello spent a lot of time to find what I did wrong , but not found.
>> I have a minikube WIndows based cluster ( Hyper V as hypervisor ) and try
>> to run examples against Spark 2.3. Tried several  docker images builds:
>> * several  builds that I build myself
>> * andrusha/spark-k8s:2.3.0-hadoop2.7 from docker  hub
>> But when I try to submit job driver log returns  class not found exception
>> org.apache.spark.examples.SparkPi
>>
>> spark-submit --master k8s://https://ip:8443  --deploy-mode cluster
>> --name spark-pi --class org.apache.spark.examples.SparkPi --conf
>> spark.executor.instances=1 --executor-memory 1G --conf spark.kubernete
>> s.container.image=andrusha/spark-k8s:2.3.0-hadoop2.7
>> local:///opt/spark/examples/spark-examples_2.11-2.3.0.jar
>>
>> I tried to use https://github.com/apache-spark-on-k8s/spark fork and it
>> is works without problems, more complex examples work also.
>>
>
>


Re: Migrate Relational to Distributed

2015-05-23 Thread Dmitry Tolpeko
Hi Brant,

Let me partially answer to your concerns: please follow a new open source
project PL/HQL (www.plhql.org) aimed at allowing you to reuse existing
logic and leverage existing skills at some extent, so you do not need to
rewrite everything to Scala/Java and can do this gradually. I hope it can
help.

Thanks,

Dmitry

On Sat, May 23, 2015 at 1:22 AM, Brant Seibert 
wrote:

> Hi,  The healthcare industry can do wonderful things with Apache Spark.
> But,
> there is already a very large base of data and applications firmly rooted
> in
> the relational paradigm and they are resistent to change - stuck on Oracle.
>
> **
> QUESTION 1 - Migrate legacy relational data (plus new transactions) to
> distributed storage?
>
> DISCUSSION 1 - The primary advantage I see is not having to engage in the
> lengthy (1+ years) process of creating a relational data warehouse and
> cubes.  Just store the data in a distributed system and "analyze first" in
> memory with Spark.
>
> **
> QUESTION 2 - Will we have to re-write the enormous amount of logic that is
> already built for the old relational system?
>
> DISCUSSION 2 - If we move the data to distributed, can we simply run that
> existing relational logic as SparkSQL queries?  [existing SQL --> Spark
> Context --> Cassandra --> process in SparkSQL --> display in existing UI].
> Can we create an RDD that uses existing SQL?  Or do we need to rewrite all
> our SQL?
>
> **
> DATA SIZE - We are adding many new data sources to a system that already
> manages health care data for over a million people.  The number of rows may
> not be enormous right now compared to the advertising industry, for
> example,
> but the number of dimensions runs well into the thousands.  If we add to
> this, IoT data for each health care patient, that creates billions of
> events
> per day, and the number of rows then grows exponentially.  We would like to
> be prepared to handle that huge data scenario.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Migrate-Relational-to-Distributed-tp22999.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming from Kafka - no receivers and spark.streaming.receiver.maxRate?

2015-05-27 Thread Dmitry Goldenberg
Got it, thank you, Tathagata and Ted.

Could you comment on my other question

as well?  Basically, I'm trying to get a handle on a good approach to
throttling, on the one hand side, and autoscaling the cluster, on the
other.  Are there any recommended approaches or design patterns for
autoscaling that you have implemented or could point me at? Thanks!

On Wed, May 27, 2015 at 8:08 PM, Tathagata Das  wrote:

> You can throttle the no receiver direct Kafka stream using
> spark.streaming.kafka.maxRatePerPartition
> 
>
>
> On Wed, May 27, 2015 at 4:34 PM, Ted Yu  wrote:
>
>> Have you seen
>> http://stackoverflow.com/questions/29051579/pausing-throttling-spark-spark-streaming-application
>> ?
>>
>> Cheers
>>
>> On Wed, May 27, 2015 at 4:11 PM, dgoldenberg 
>> wrote:
>>
>>> Hi,
>>>
>>> With the no receivers approach to streaming from Kafka, is there a way to
>>> set something like spark.streaming.receiver.maxRate so as not to
>>> overwhelm
>>> the Spark consumers?
>>>
>>> What would be some of the ways to throttle the streamed messages so that
>>> the
>>> consumers don't run out of memory?
>>>
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-from-Kafka-no-receivers-and-spark-streaming-receiver-maxRate-tp23061.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-27 Thread Dmitry Goldenberg
Thanks, Rajesh.  I think that acquring/relinquishing executors is important
but I feel like there are at least two layers for resource allocation and
autoscaling.  It seems that acquiring and relinquishing executors is a way
to optimize resource utilization within a pre-set Spark cluster of machines.

However, to accommodate for big spikes in input data, we also need the
actual cluster scaling, i.e. adding (or removing, when no longer needed)
worker node machines automatically.  On that front, I wonder how Spark
reacts to a machine being added or removed and what the actual procedure
would be.  If we're running on a Hadoop cluster, there's a description of
adding a node

there.  There's also discussions of Hadoop node adding/removal such as this
one

.

My worry is, will Spark "gracefully" and "quickly" detect the presence of a
new node and start utilizing it (i.e. how much does it communicate with the
Hadoop cluster manager?)...  By the same token, if a node is removed, how
can it be removed gracefully so as not to affect/kill any running Spark
jobs?

On Wed, May 27, 2015 at 10:57 PM,  wrote:

> *Dell - Internal Use - Confidential *
>
> Did you check
> https://drive.google.com/file/d/0B7tmGAdbfMI2OXl6azYySk5iTGM/edit and
> http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Not sure if the spark kafka receiver emits metrics on the lag, check this
>  link out
> http://community.spiceworks.com/how_to/77610-how-far-behind-is-your-kafka-consumer
>
>
>
> You should be able to whip up a script that runs the Kafka
> ConsumerOffsetChecker periodically and pipe it to a metrics backend of your
> choice. Based on this you can work the dynamic resource allocation magic.
>
> -Original Message-
> From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
> Sent: Wednesday, May 27, 2015 6:21 PM
> To: user@spark.apache.org
> Subject: Autoscaling Spark cluster based on topic sizes/rate of growth in
> Kafka or Spark's metrics?
>
> Hi,
>
> I'm trying to understand if there are design patterns for autoscaling
> Spark (add/remove slave machines to the cluster) based on the throughput.
>
> Assuming we can throttle Spark consumers, the respective Kafka topics we
> stream data from would start growing. What are some of the ways to generate
> the metrics on the number of new messages and the rate they are piling up?
> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
> with the Metric interface and not much else...
>
> What are some of the ways to expand/contract the Spark cluster? Someone
> has mentioned Mesos...
>
> I see some info on Spark metrics in the Spark monitoring guide . Do we
> want to perhaps implement a custom sink that would help us autoscale up or
> down based on the throughput?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thank you, Gerard.

We're looking at the receiver-less setup with Kafka Spark streaming so I'm
not sure how to apply your comments to that case (not that we have to use
receiver-less but it seems to offer some advantages over the
receiver-based).

As far as "the number of Kafka receivers is fixed for the lifetime of your
DStream" -- this may be OK to start with. What I'm researching is the
ability to add worker nodes to the Spark cluster when needed and remove
them when no longer needed.  Do I understand correctly that a single
receiver may cause work to be farmed out to multiple 'slave'
machines/worker nodes?  If that's the case, we're less concerned with
multiple receivers; we're concerned with the worker node cluster itself.

If we use the ConsumerOffsetChecker class in Kafka that Rajesh mentioned
and instrument dynamic adding/removal of machines, my subsequent questions
then are, a) will Spark sense the addition of a new node / is it sufficient
that the cluster manager is aware, then work just starts flowing there?
 and  b) what would be a way to gracefully remove a worker node when the
load subsides, so that no currently running Spark job is killed?

- Dmitry

On Thu, May 28, 2015 at 7:36 AM, Gerard Maas  wrote:

> Hi,
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
> *Longer version.*
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
> Once the data is received, it will be processed in a 'classical' Spark
> pipeline, so previous posts on spark resource scheduling might apply.
>
> Regarding metrics, the standard metrics subsystem of spark will report
> streaming job performance. Check the driver's metrics endpoint to peruse
> the available metrics:
>
> :/metrics/json
>
> -kr, Gerard.
>
>
> (*) Spark is a project that moves so fast that statements might be
> invalidated by new work every minute.
>
> On Thu, May 28, 2015 at 1:21 AM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> I'm trying to understand if there are design patterns for autoscaling
>> Spark
>> (add/remove slave machines to the cluster) based on the throughput.
>>
>> Assuming we can throttle Spark consumers, the respective Kafka topics we
>> stream data from would start growing.  What are some of the ways to
>> generate
>> the metrics on the number of new messages and the rate they are piling up?
>> This perhaps is more of a Kafka question; I see a pretty sparse javadoc
>> with
>> the Metric interface and not much else...
>>
>> What are some of the ways to expand/contract the Spark cluster? Someone
>> has
>> mentioned Mesos...
>>
>> I see some info on Spark metrics in  the Spark monitoring guide
>> <https://spark.apache.org/docs/latest/monitoring.html>  .  Do we want to
>> perhaps implement a custom sink that would help us autoscale up or down
>> based on the throughput?
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Autoscaling-Spark-cluster-based-on-topic-sizes-rate-of-growth-in-Kafka-or-Spark-s-metrics-tp23062.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Evo.  Per the last part of your comment, it sounds like we will
need to implement a job manager which will be in control of starting the
jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
marking them as ones to relaunch, scaling the cluster up/down by
adding/removing machines, and relaunching the 'suspended' (shut down) jobs.

I suspect that relaunching the jobs may be tricky since that means keeping
track of the starter offsets in Kafka topic(s) from which the jobs started
working on.

Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
of jobs, coupled with the wait for the new machines to come online may turn
out quite time-consuming which will make for lengthy request times, and our
requests are not asynchronous.  Ideally, the currently running jobs would
continue to run on the machines currently available in the cluster.

In the scale-down case, the job manager would want to signal to Spark's job
scheduler not to send work to the node being taken out, find out when the
last job has finished running on the node, then take the node out.

This is somewhat like changing the number of cylinders in a car engine
while the car is running...

Sounds like a great candidate for a set of enhancements in Spark...

On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov  wrote:

> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Learning in your Feedback Loop to make it handle
> the message consumption rate more intelligently and benefit from ongoing
> online learning – BUT this is STILL about voluntarily sacrificing your
> performance in the name of keeping your system stable – it is not about
> scaling your system/solution
>
>
>
> In terms of how to scale the Spark Framework Dynamically – even though
> this is not supported at the moment out of the box I guess you can have a
> sys management framework spin dynamically a few more boxes (spark worker
> nodes), stop dynamically your currently running Spark Streaming Job,
> relaunch it with new params e.g. more Receivers, larger number of
> Partitions (hence tasks), more RAM per executor etc. Obviously this will
> cause some temporary delay in fact interruption in your processing but if
> the business use case can tolerate that then go for it
>
>
>
> *From:* Gerard Maas [mailto:gerard.m...@gmail.com]
> *Sent:* Thursday, May 28, 2015 12:36 PM
> *To:* dgoldenberg
> *Cc:* spark users
> *Subject:* Re: Autoscaling Spark cluster based on topic sizes/rate of
> growth in Kafka or Spark's metrics?
>
>
>
> Hi,
>
>
>
> tl;dr At the moment (with a BIG disclaimer *) elastic scaling of spark
> streaming processes is not supported.
>
>
>
>
>
> *Longer version.*
>
>
>
> I assume that you are talking about Spark Streaming as the discussion is
> about handing Kafka streaming data.
>
>
>
> Then you have two things to consider: the Streaming receivers and the
> Spark processing cluster.
>
>
>
> Currently, the receiving topology is static. One receiver is allocated
> with each DStream instantiated and it will use 1 core in the cluster. Once
> the StreamingContext is started, this topology cannot be changed, therefore
> the number of Kafka receivers is fixed for the lifetime of your DStream.
>
> What we do is to calculate the cluster capacity and use that as a fixed
> upper bound (with a margin) for the receiver throughput.
>
>
>
> There's work in progress to add a reactive model to the receiver, where
> backpressure can be applied to handle overload conditions. See
> https://issues.apache.org/jira/browse/SPARK-7398
>
>
>
> Once the data is received, it will be processed in a 'classical' Spark
> pipeline, so previous posts on spark resource scheduling might apply.
>
>
>
> Regarding metrics, the standard metrics subsystem of spark will report
> streaming job performance. Check the driver's metrics endpoint to peruse
> the available metrics:
>
>
>
> :/metrics/json
>
>
>
> -kr, Gera

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Evo, good points.

On the dynamic resource allocation, I'm surmising this only works within a
particular cluster setup.  So it improves the usage of current cluster
resources but it doesn't make the cluster itself elastic. At least, that's
my understanding.

Memory + disk would be good and hopefully it'd take *huge* load on the
system to start exhausting the disk space too.  I'd guess that falling onto
disk will make things significantly slower due to the extra I/O.

Perhaps we'll really want all of these elements eventually.  I think we'd
want to start with memory only, keeping maxRate low enough not to overwhelm
the consumers; implement the cluster autoscaling.  We might experiment with
dynamic resource allocation before we get to implement the cluster
autoscale.



On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
> ---- Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
>  Original message 
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the jobs may be tricky since that means keeping
> track of the starter offsets in Kafka topic(s) from which the jobs started
> working on.
>
>
>
> Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
> of jobs, coupled with the wait for the new machines to come online may turn
> out quite time-consuming which will make for lengthy request times, and our
> requests are not asynchronous.  Ideally, the currently running jobs would
> continue to run on the machines currently available in the cluster.
>
>
>
> In the scale-down case, the job manager would want to signal to Spark's
> job scheduler not to send work to the node being taken out, find out when
> the last job has finished running on the node, then take the node out.
>
>
>
> This is somewhat like changing the number of cylinders in a car engine
> while the car is running...
>
>
>
> Sounds like a great candidate for a set of enhancements in Spark...
>
>
>
> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
> wrote:
>
> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to 

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Thanks, Andrew.

>From speaking with customers, this is one of the most pressing issues for
them (burning hot, to be precise), especially in a SAAS type of environment
and especially with commodity hardware at play. Understandably, folks don't
want to pay for more hardware usage than necessary and they want to be able
to handle the peaks and valleys of usage (especially the peaks) optimally.

It looks like there needs to be a generic 'watchdog' type of service which
would get metrics/signals from things like Kafka, then call into a
(potentially custom) handler which will cause new hardware to be
provisioned or decomissioned.  Needless to say, both Spark, the watchdog,
and the provisioner need to be completely in sync and mindful of currently
running Spark jobs so that new hardware immediately picks up extra load and
hardware is only decommissioned as any running Spark jobs have been
acquiesced...

As I learn more about the configuration parameters and dynamic resource
allocation, I'm starting to feel that a dashboard with all these knobs
exposed would be so useful. Being able to test/simulate load volumes and
tweak the knobs as necessary, to arrive at the optimal patterns...

Regards,
- Dmitry

On Thu, May 28, 2015 at 3:21 PM, Andrew Or  wrote:

> Hi all,
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
> -Andrew
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>> – it will be your insurance policy against sys crashes due to memory leaks.
>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>> and of course resorting to disk from time to time (ie when there is no free
>> RAM ) and taking a performance hit from that, BUT only until there is no
>> free RAM
>>
>>
>>
>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>> *Sent:* Thursday, May 28, 2015 2:34 PM
>> *To:* Evo Eftimov
>> *Cc:* Gerard Maas; spark users
>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>> sizes/rate of growth in Kafka or Spark's metrics?
>>
>>
>>
>> Evo, good points.
>>
>>
>>
>> On the dynamic resource allocation, I'm surmising this only works within
>> a particular cluster setup.  So it improves the usage of current cluster
>> resources but it doesn't make the cluster itself elastic. At least, that's
>> my understanding.
>>
>>
>>
>> Memory + disk would be good and hopefully it'd take *huge* load on the
>> system to start exhausting the disk space too.  I'd guess that falling onto
>> disk will make things significantly slower due to the extra I/O.
>>
>>
>>
>> Perhaps we'll really want all of these elements eventually.  I think we'd
>> want to start with memory only, keeping maxRate low enough not to overwhelm
>> the consumers; implement the cluster autoscaling.  We might experiment with
>> dynamic resource allocation before we get to implement the cluster
>> autoscale.
>>
>>
>>
>>
>>
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>> You can also try Dynamic Resource Allocation
>>
>>
>>
>>
>> https://spark.apache.org/docs/1.3.1/job-scheduling.ht

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-05-28 Thread Dmitry Goldenberg
Which would imply that if there was a load manager type of service, it
could signal to the driver(s) that they need to acquiesce, i.e. process
what's at hand and terminate.  Then bring up a new machine, then restart
the driver(s)...  Same deal with removing machines from the cluster. Send a
signal for the drivers to pipe down and terminate, then restart them.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
>> Hi all,
>>
>> As the author of the dynamic allocation feature I can offer a few
>> insights here.
>>
>> Gerard's explanation was both correct and concise: dynamic allocation is
>> not intended to be used in Spark streaming at the moment (1.4 or before).
>> This is because of two things:
>>
>> (1) Number of receivers is necessarily fixed, and these are started in
>> executors. Since we need a receiver for each InputDStream, if we kill these
>> receivers we essentially stop the stream, which is not what we want. It
>> makes little sense to close and restart a stream the same way we kill and
>> relaunch executors.
>>
>> (2) Records come in every batch, and when there is data to process your
>> executors are not idle. If your idle timeout is less than the batch
>> duration, then you'll end up having to constantly kill and restart
>> executors. If your idle timeout is greater than the batch duration, then
>> you'll never kill executors.
>>
>> Long answer short, with Spark streaming there is currently no
>> straightforward way to scale the size of your cluster. I had a long
>> discussion with TD (Spark streaming lead) about what needs to be done to
>> provide some semblance of dynamic scaling to streaming applications, e.g.
>> take into account the batch queue instead. We came up with a few ideas that
>> I will not detail here, but we are looking into this and do intend to
>> support it in the near future.
>>
>> -Andrew
>>
>>
>>
>> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>>> – it will be your insurance policy against sys crashes due to memory leaks.
>>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>>> and of course resorting to disk from time to time (ie when there is no free
>>> RAM ) and taking a performance hit from that, BUT only until there is no
>>> free RAM
>>>
>>>
>>>
>>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>>> *Sent:* Thursday, May 28, 2015 2:34 PM
>>> *To:* Evo Eftimov
>>> *Cc:* Gerard Maas; spark users
>>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>>> sizes/rate of growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Evo, good points.
>>>
>>>
>>>
>>> On the dynamic resource allocation, I'm surmising this only works within
>>> a particular cluster setup.  So it improves the usage of current cluster
>>> resources but it doesn't make the cluster itself elastic. At least, that's
>>> my understanding.
>>>
>>>
>>>
>>> Memory + disk would be good and hopefully it'd take *huge* load on the
>>> system to start exhausting the disk space too.  I'd guess that falling onto
>>> disk will make things significantly slower due to the extra I/O.
>>>
>>>
>>>
>>> Perhaps we'll really want all of these elements eventually.  I think
>>> we'd want to start with memory only, keeping maxRate low enough not to
>>> overwhelm the consumers; implement the cluster autoscaling.  We might
>>> experiment with dynamic resource allocation before we get to implement the
>>> cluster autoscale.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>>> wrote:
>>>
>>> You can also try Dynamic Resource Allocation
>>>
>>>
>>>
>>>
>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>
>>>
>>>
>>> Also re the Feedback Loop for automatic message consumption rate
>>> adjustment – there is a “dumb” solution option – simply set the storage
>>> policy for the DStream RDDs t

Re: How to monitor Spark Streaming from Kafka?

2015-06-01 Thread Dmitry Goldenberg
Thank you, Tathagata, Cody, Otis.

- Dmitry


On Mon, Jun 1, 2015 at 6:57 PM, Otis Gospodnetic  wrote:

> I think you can use SPM - http://sematext.com/spm - it will give you all
> Spark and all Kafka metrics, including offsets broken down by topic, etc.
> out of the box.  I see more and more people using it to monitor various
> components in data processing pipelines, a la
> http://blog.sematext.com/2015/04/22/monitoring-stream-processing-tools-cassandra-kafka-and-spark/
>
> Otis
>
> On Mon, Jun 1, 2015 at 5:23 PM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> What are some of the good/adopted approached to monitoring Spark Streaming
>> from Kafka?  I see that there are things like
>> http://quantifind.github.io/KafkaOffsetMonitor, for example.  Do they all
>> assume that Receiver-based streaming is used?
>>
>> Then "Note that one disadvantage of this approach (Receiverless Approach,
>> #2) is that it does not update offsets in Zookeeper, hence Zookeeper-based
>> Kafka monitoring tools will not show progress. However, you can access the
>> offsets processed by this approach in each batch and update Zookeeper
>> yourself".
>>
>> The code sample, however, seems sparse. What do you need to do here? -
>>  directKafkaStream.foreachRDD(
>>  new Function, Void>() {
>>  @Override
>>  public Void call(JavaPairRDD rdd) throws
>> IOException {
>>  OffsetRange[] offsetRanges =
>> ((HasOffsetRanges)rdd).offsetRanges
>>  // offsetRanges.length = # of Kafka partitions being consumed
>>  ...
>>  return null;
>>  }
>>  }
>>  );
>>
>> and if these are updated, will KafkaOffsetMonitor work?
>>
>> Monitoring seems to center around the notion of a consumer group.  But in
>> the receiverless approach, code on the Spark consumer side doesn't seem to
>> expose a consumer group parameter.  Where does it go?  Can I/should I just
>> pass in group.id as part of the kafkaParams HashMap?
>>
>> Thanks
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-monitor-Spark-Streaming-from-Kafka-tp23103.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Objects serialized before foreachRDD/foreachPartition ?

2015-06-03 Thread Dmitry Goldenberg
So Evo, option b is to "singleton" the Param, as in your modified snippet,
i.e. instantiate is once per an RDD.

But if I understand correctly the a) option is broadcast, meaning
instantiation is in the Driver once before any transformations and actions,
correct?  That's where my serialization costs concerns were.  There's the
Kryo serialization but Param might still be too heavy.  If some of its
member variables are lazy loaded we may be OK.  But it seems then on every
worker node the lazy initialization would have to happen to load these lazy
loaded resources into Param - ?

public class Param {
   // ==> potentially a very hefty resource to load
   private Map dictionary = new HashMap();
   ...
}

I'm groking that Spark will serialize Param right before the call to
foreachRDD, if we're to broadcast...



On Wed, Jun 3, 2015 at 9:58 AM, Evo Eftimov  wrote:

> Dmitry was concerned about the “serialization cost” NOT the “memory
> footprint – hence option a) is still viable since a Broadcast is performed
> only ONCE for the lifetime of Driver instance
>
>
>
> *From:* Ted Yu [mailto:yuzhih...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 2:44 PM
> *To:* Evo Eftimov
> *Cc:* dgoldenberg; user
> *Subject:* Re: Objects serialized before foreachRDD/foreachPartition ?
>
>
>
> Considering memory footprint of param as mentioned by Dmitry, option b
> seems better.
>
>
>
> Cheers
>
>
>
> On Wed, Jun 3, 2015 at 6:27 AM, Evo Eftimov  wrote:
>
> Hmmm a spark streaming app code doesn't execute in the linear fashion
> assumed in your previous code snippet - to achieve your objectives you
> should do something like the following
>
> in terms of your second objective - saving the initialization and
> serialization of the params you can:
>
> a) broadcast them
> b) have them as a Singleton (initialized from e.g. params in a file on
> HDFS)
> on each Executor
>
> messageBodies.foreachRDD(new Function, Void>() {
>
> Param param = new Param();
> param.initialize();
>
>   @Override
>   public Void call(JavaRDD rdd) throws Exception {
> ProcessPartitionFunction func = new
> ProcessPartitionFunction(param);
> rdd.foreachPartition(func);
> return null;
>   }
>
> });
>
> //put this in e.g. the object destructor
> param.deinitialize();
>
>
> -Original Message-
> From: dgoldenberg [mailto:dgoldenberg...@gmail.com]
> Sent: Wednesday, June 3, 2015 1:56 PM
> To: user@spark.apache.org
> Subject: Objects serialized before foreachRDD/foreachPartition ?
>
> I'm looking at https://spark.apache.org/docs/latest/tuning.html.
> Basically
> the takeaway is that all objects passed into the code processing RDD's must
> be serializable. So if I've got a few objects that I'd rather initialize
> once and deinitialize once outside of the logic processing the RDD's, I'd
> need to think twice about the costs of serializing such objects, it would
> seem.
>
> In the below, does the Spark serialization happen before calling foreachRDD
> or before calling foreachPartition?
>
> Param param = new Param();
> param.initialize();
> messageBodies.foreachRDD(new Function, Void>() {
>   @Override
>   public Void call(JavaRDD rdd) throws Exception {
> ProcessPartitionFunction func = new
> ProcessPartitionFunction(param);
> rdd.foreachPartition(func);
> return null;
>   }
> });
> param.deinitialize();
>
> If param gets initialized to a significant memory footprint, are we better
> off creating/initializing it before calling new ProcessPartitionFunction()
> or perhaps in the 'call' method within that function?
>
> I'm trying to avoid calling expensive init()/deinit() methods while
> balancing against the serialization costs. Thanks.
>
>
>
> --
> View this message in context:
>
> http://apache-spark-user-list.1001560.n3.nabble.com/Objects-serialized-befor
> e-foreachRDD-foreachPartition-tp23134.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Would it be possible to implement Spark autoscaling somewhat along these
lines? --

1. If we sense that a new machine is needed, by watching the data load in
Kafka topic(s), then
2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
and get a machine);
3. Create a "shadow"/mirror Spark master running alongside the initial
version which talks to N machines. The new mirror version is aware of N+1
machines (or N+M if we had decided we needed M new boxes).
4. The previous version of the Spark runtime is acquiesced/decommissioned.
We possibly get both clusters working on the same data which may actually
be OK (at least for our specific use-cases).
5. Now the new Spark cluster is running.

Similarly, the decommissioning of M unused boxes would happen, via this
notion of a mirror Spark runtime.  How feasible would it be for such a
mirrorlike setup to be created, especially created programmatically?
Especially point #3.

The other idea we'd entertained was to bring in a new machine, acquiesce
down all currently running workers by telling them to process their current
batch then shut down, then restart the consumers now that Spark is aware of
a modified cluster.  This has the drawback of a downtime that may not be
tolerable in terms of latency, by the system's clients waiting for their
responses in a synchronous fashion.

Thanks.

On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger  wrote:

> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
>> Hi all,
>>
>> As the author of the dynamic allocation feature I can offer a few
>> insights here.
>>
>> Gerard's explanation was both correct and concise: dynamic allocation is
>> not intended to be used in Spark streaming at the moment (1.4 or before).
>> This is because of two things:
>>
>> (1) Number of receivers is necessarily fixed, and these are started in
>> executors. Since we need a receiver for each InputDStream, if we kill these
>> receivers we essentially stop the stream, which is not what we want. It
>> makes little sense to close and restart a stream the same way we kill and
>> relaunch executors.
>>
>> (2) Records come in every batch, and when there is data to process your
>> executors are not idle. If your idle timeout is less than the batch
>> duration, then you'll end up having to constantly kill and restart
>> executors. If your idle timeout is greater than the batch duration, then
>> you'll never kill executors.
>>
>> Long answer short, with Spark streaming there is currently no
>> straightforward way to scale the size of your cluster. I had a long
>> discussion with TD (Spark streaming lead) about what needs to be done to
>> provide some semblance of dynamic scaling to streaming applications, e.g.
>> take into account the batch queue instead. We came up with a few ideas that
>> I will not detail here, but we are looking into this and do intend to
>> support it in the near future.
>>
>> -Andrew
>>
>>
>>
>> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>>
>> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
>>> – it will be your insurance policy against sys crashes due to memory leaks.
>>> Until there is free RAM, spark streaming (spark) will NOT resort to disk –
>>> and of course resorting to disk from time to time (ie when there is no free
>>> RAM ) and taking a performance hit from that, BUT only until there is no
>>> free RAM
>>>
>>>
>>>
>>> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
>>> *Sent:* Thursday, May 28, 2015 2:34 PM
>>> *To:* Evo Eftimov
>>> *Cc:* Gerard Maas; spark users
>>> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
>>> sizes/rate of growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Evo, good points.
>>>
>>>
>>>
>>> On the dynamic resource allocation, I'm surmising this only works within
>>> a particular cluster setup.  So it improves the usage of current cluster
>>> resources but it doesn't make the cluster itself elastic. At least, that's
>>> my understanding.
>>>
>>>
>>>
>>> Memory + disk would be good and hopefully it'd take *huge* load on the
>>> system to start exhausting the disk space too.  I'd guess that falling onto
>>> disk will make things significantly slower due to the extra I/O.
>>>
>>>
>>&

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Evo,

One of the ideas is to shadow the current cluster. This way there's no
extra latency incurred due to shutting down of the consumers. If two sets
of consumers are running, potentially processing the same data, that is OK.
We phase out the older cluster and gradually flip over to the new one,
insuring no downtime or extra latency.  Thoughts?

On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov  wrote:

> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
>
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than the batch
> duration, then you'll end up having to constantly kill and restart
> executors. If your idle timeout is greater than the batch duration, then
> you'll never kill executors.
>
>
>
> Long answer short, with Spark streaming there is currently no
> straightforward way to scale the size of your cluster. I had a long
> discussion with TD (Spark streaming lead) about what needs to be done to
> provide some semblance of dynamic scaling to streaming applications, e.g.
> take into account the batch queue instead. We came up with a few ideas that
> I will not detail here, but we are looking into this and do intend to
> support it in the near future.
>
>
>
> -Andrew
>
>
>
>
>
>
>
> 2015-05-28 8:02 GMT-07:00 Evo Eftimov :
>
>
>
> Probably you should ALWAYS keep the RDD storage policy to MEMORY AND DISK
> – it will be your insurance policy against sys crashes due to m

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
Great.

"You should monitor vital performance / job clogging stats of the Spark
Streaming Runtime not “kafka topics” -- anything specific you were thinking
of?

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and restart a stream the same way we kill and
> relaunch executors.
>
>
>
> (2) Records come in every batch, and when there is data to process your
> executors are not idle. If your idle timeout is less than th

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
If we have a hand-off between the older consumer and the newer consumer, I
wonder if we need to manually manage the offsets in Kafka so as not to miss
some messages as the hand-off is happening.

Or if we let the new consumer run for a bit then let the old consumer know
the 'new guy is in town' then the old consumer can be shut off.  Some
overlap is OK...

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:
>
>
>
> (1) Number of receivers is necessarily fixed, and these are started in
> executors. Since we need a receiver for each InputDStream, if we kill these
> receivers we essentially stop the stream, which is not what we want. It
> makes little sense to close and rest

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-03 Thread Dmitry Goldenberg
I think what we'd want to do is track the ingestion rate in the consumer(s)
via Spark's aggregation functions and such. If we're at a critical level
(load too high / load too low) then we issue a request into our
Provisioning Component to add/remove machines. Once it comes back with an
"OK", each consumer can finish its current batch, then terminate itself,
and restart with a new context.  The new context would be aware of the
updated cluster - correct?  Therefore the refreshed consumer would restart
on the updated cluster.

Could we even terminate the consumer immediately upon sensing a critical
event?  When it would restart, could it resume right where it left off?

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> Thi

Re: StreamingListener, anyone?

2015-06-04 Thread Dmitry Goldenberg
Shixiong,

Thanks, interesting point. So if we want to only process one batch then
terminate the consumer, what's the best way to achieve that? Presumably the
listener could set a flag on the driver notifying it that it can terminate.
But the driver is not in a loop, it's basically blocked in
awaitTermination.  So what would be a way to trigger the termination in the
driver?

"context.awaitTermination() allows the current thread to wait for the
termination of a context by stop() or by an exception" - presumably, we
need to call stop() somewhere or perhaps throw.

Cheers,
- Dmitry

On Thu, Jun 4, 2015 at 3:55 AM, Shixiong Zhu  wrote:

> You should not call `jssc.stop(true);` in a StreamingListener. It will
> cause a dead-lock: `jssc.stop` won't return until `listenerBus` exits. But
> since `jssc.stop` blocks `StreamingListener`, `listenerBus` cannot exit.
>
> Best Regards,
> Shixiong Zhu
>
> 2015-06-04 0:39 GMT+08:00 dgoldenberg :
>
>> Hi,
>>
>> I've got a Spark Streaming driver job implemented and in it, I register a
>> streaming listener, like so:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>Durations.milliseconds(params.getBatchDurationMillis()));
>> jssc.addStreamingListener(new JobListener(jssc));
>>
>> where JobListener is defined like so
>> private static class JobListener implements StreamingListener {
>>
>> private JavaStreamingContext jssc;
>>
>> JobListener(JavaStreamingContext jssc) {
>> this.jssc = jssc;
>> }
>>
>> @Override
>> public void
>> onBatchCompleted(StreamingListenerBatchCompleted
>> batchCompleted) {
>> System.out.println(">> Batch completed.");
>> jssc.stop(true);
>> System.out.println(">> The job has been
>> stopped.");
>> }
>> 
>>
>> I do not seem to be seeing onBatchCompleted being triggered.  Am I doing
>> something wrong?
>>
>> In this particular case, I was trying to implement a bulk ingest type of
>> logic where the first batch is all we're interested in (reading out of a
>> Kafka topic with offset reset set to "smallest").
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/StreamingListener-anyone-tp23140.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-04 Thread Dmitry Goldenberg
"set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
appears the storage level can be specified in the createStream methods but
not createDirectStream...


On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov  wrote:

> You can also try Dynamic Resource Allocation
>
>
>
>
> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>
>
>
> Also re the Feedback Loop for automatic message consumption rate
> adjustment – there is a “dumb” solution option – simply set the storage
> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
> exhausted spark streaming will resort to keeping new RDDs on disk which
> will prevent it from crashing and hence loosing them. Then some memory will
> get freed and it will resort back to RAM and so on and so forth
>
>
>
>
>
> Sent from Samsung Mobile
>
>  Original message 
>
> From: Evo Eftimov
>
> Date:2015/05/28 13:22 (GMT+00:00)
>
> To: Dmitry Goldenberg
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> You can always spin new boxes in the background and bring them into the
> cluster fold when fully operational and time that with job relaunch and
> param change
>
>
>
> Kafka offsets are mabaged automatically for you by the kafka clients which
> keep them in zoomeeper dont worry about that ad long as you shut down your
> job gracefuly. Besides msnaging the offsets explicitly is not a big deal if
> necessary
>
>
>
>
>
> Sent from Samsung Mobile
>
>
>
>  Original message 
>
> From: Dmitry Goldenberg
>
> Date:2015/05/28 13:16 (GMT+00:00)
>
> To: Evo Eftimov
>
> Cc: Gerard Maas ,spark users
>
> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of growth
> in Kafka or Spark's metrics?
>
>
>
> Thanks, Evo.  Per the last part of your comment, it sounds like we will
> need to implement a job manager which will be in control of starting the
> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
> marking them as ones to relaunch, scaling the cluster up/down by
> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>
>
>
> I suspect that relaunching the jobs may be tricky since that means keeping
> track of the starter offsets in Kafka topic(s) from which the jobs started
> working on.
>
>
>
> Ideally, we'd want to avoid a re-launch.  The 'suspension' and relaunching
> of jobs, coupled with the wait for the new machines to come online may turn
> out quite time-consuming which will make for lengthy request times, and our
> requests are not asynchronous.  Ideally, the currently running jobs would
> continue to run on the machines currently available in the cluster.
>
>
>
> In the scale-down case, the job manager would want to signal to Spark's
> job scheduler not to send work to the node being taken out, find out when
> the last job has finished running on the node, then take the node out.
>
>
>
> This is somewhat like changing the number of cylinders in a car engine
> while the car is running...
>
>
>
> Sounds like a great candidate for a set of enhancements in Spark...
>
>
>
> On Thu, May 28, 2015 at 7:52 AM, Evo Eftimov 
> wrote:
>
> @DG; The key metrics should be
>
>
>
> -  Scheduling delay – its ideal state is to remain constant over
> time and ideally be less than the time of the microbatch window
>
> -  The average job processing time should remain less than the
> micro-batch window
>
> -  Number of Lost Jobs – even if there is a single Job lost that
> means that you have lost all messages for the DStream RDD processed by that
> job due to the previously described spark streaming memory leak condition
> and subsequent crash – described in previous postings submitted by me
>
>
>
> You can even go one step further and periodically issue “get/check free
> memory” to see whether it is decreasing relentlessly at a constant rate –
> if it touches a predetermined RAM threshold that should be your third
> metric
>
>
>
> Re the “back pressure” mechanism – this is a Feedback Loop mechanism and
> you can implement one on your own without waiting for Jiras and new
> features whenever they might be implemented by the Spark dev team –
> moreover you can avoid using slow mechanisms such as ZooKeeper and even
> incorporate some Machine Learning in your Feedback Loop to make it handle
> the message consumption rate more intelligently and benefit from ongoing
>

Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-04 Thread Dmitry Goldenberg
Thanks so much, Yiannis, Olivier, Huang!

On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas 
wrote:

> Hi there,
>
> I would recommend checking out
> https://github.com/spark-jobserver/spark-jobserver which I think gives
> the functionality you are looking for.
> I haven't tested it though.
>
> BR
>
> On 5 June 2015 at 01:35, Olivier Girardot  wrote:
>
>> You can use it as a broadcast variable, but if it's "too" large (more
>> than 1Gb I guess), you may need to share it joining this using some kind of
>> key to the other RDDs.
>> But this is the kind of thing broadcast variables were designed for.
>>
>> Regards,
>>
>> Olivier.
>>
>> Le jeu. 4 juin 2015 à 23:50, dgoldenberg  a
>> écrit :
>>
>>> We have some pipelines defined where sometimes we need to load
>>> potentially
>>> large resources such as dictionaries.
>>>
>>> What would be the best strategy for sharing such resources among the
>>> transformations/actions within a consumer?  Can they be shared somehow
>>> across the RDD's?
>>>
>>> I'm looking for a way to load such a resource once into the cluster
>>> memory
>>> and have it be available throughout the lifecycle of a consumer...
>>>
>>> Thanks.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Re: How to share large resources like dictionaries while processing data with Spark ?

2015-06-05 Thread Dmitry Goldenberg
Thanks everyone. Evo, could you provide a link to the Lookup RDD project? I
can't seem to locate it exactly on Github. (Yes, to your point, our project
is Spark streaming based). Thank you.

On Fri, Jun 5, 2015 at 6:04 AM, Evo Eftimov  wrote:

> Oops, @Yiannis, sorry to be a party pooper but the Job Server is for Spark
> Batch Jobs (besides anyone can put something like that in 5 min), while I
> am under the impression that Dmytiy is working on Spark Streaming app
>
>
>
> Besides the Job Server is essentially for sharing the Spark Context
> between multiple threads
>
>
>
> Re Dmytiis intial question – you can load large data sets as Batch
> (Static) RDD from any Spark Streaming App and then join DStream RDDs
> against them to emulate “lookups” , you can also try the “Lookup RDD” –
> there is a git hub project
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Friday, June 5, 2015 12:12 AM
> *To:* Yiannis Gkoufas
> *Cc:* Olivier Girardot; user@spark.apache.org
> *Subject:* Re: How to share large resources like dictionaries while
> processing data with Spark ?
>
>
>
> Thanks so much, Yiannis, Olivier, Huang!
>
>
>
> On Thu, Jun 4, 2015 at 6:44 PM, Yiannis Gkoufas 
> wrote:
>
> Hi there,
>
>
>
> I would recommend checking out
> https://github.com/spark-jobserver/spark-jobserver which I think gives
> the functionality you are looking for.
>
> I haven't tested it though.
>
>
>
> BR
>
>
>
> On 5 June 2015 at 01:35, Olivier Girardot  wrote:
>
> You can use it as a broadcast variable, but if it's "too" large (more than
> 1Gb I guess), you may need to share it joining this using some kind of key
> to the other RDDs.
>
> But this is the kind of thing broadcast variables were designed for.
>
>
>
> Regards,
>
>
>
> Olivier.
>
>
>
> Le jeu. 4 juin 2015 à 23:50, dgoldenberg  a
> écrit :
>
> We have some pipelines defined where sometimes we need to load potentially
> large resources such as dictionaries.
>
> What would be the best strategy for sharing such resources among the
> transformations/actions within a consumer?  Can they be shared somehow
> across the RDD's?
>
> I'm looking for a way to load such a resource once into the cluster memory
> and have it be available throughout the lifecycle of a consumer...
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-large-resources-like-dictionaries-while-processing-data-with-Spark-tp23162.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>
>
>


Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-09 Thread Dmitry Goldenberg
At which point would I call cache()?  I just want the runtime to spill to
disk when necessary without me having to know when the "necessary" is.


On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger  wrote:

> direct stream isn't a receiver, it isn't required to cache data anywhere
> unless you want it to.
>
> If you want it, just call cache.
>
> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
>> appears the storage level can be specified in the createStream methods but
>> not createDirectStream...
>>
>>
>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>> wrote:
>>
>>> You can also try Dynamic Resource Allocation
>>>
>>>
>>>
>>>
>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>
>>>
>>>
>>> Also re the Feedback Loop for automatic message consumption rate
>>> adjustment – there is a “dumb” solution option – simply set the storage
>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>> will prevent it from crashing and hence loosing them. Then some memory will
>>> get freed and it will resort back to RAM and so on and so forth
>>>
>>>
>>>
>>>
>>>
>>> Sent from Samsung Mobile
>>>
>>>  Original message 
>>>
>>> From: Evo Eftimov
>>>
>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>
>>> To: Dmitry Goldenberg
>>>
>>> Cc: Gerard Maas ,spark users
>>>
>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>> growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> You can always spin new boxes in the background and bring them into the
>>> cluster fold when fully operational and time that with job relaunch and
>>> param change
>>>
>>>
>>>
>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>> which keep them in zoomeeper dont worry about that ad long as you shut down
>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>> deal if necessary
>>>
>>>
>>>
>>>
>>>
>>> Sent from Samsung Mobile
>>>
>>>
>>>
>>>  Original message 
>>>
>>> From: Dmitry Goldenberg
>>>
>>> Date:2015/05/28 13:16 (GMT+00:00)
>>>
>>> To: Evo Eftimov
>>>
>>> Cc: Gerard Maas ,spark users
>>>
>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>> growth in Kafka or Spark's metrics?
>>>
>>>
>>>
>>> Thanks, Evo.  Per the last part of your comment, it sounds like we will
>>> need to implement a job manager which will be in control of starting the
>>> jobs, monitoring the status of the Kafka topic(s), shutting jobs down and
>>> marking them as ones to relaunch, scaling the cluster up/down by
>>> adding/removing machines, and relaunching the 'suspended' (shut down) jobs.
>>>
>>>
>>>
>>> I suspect that relaunching the jobs may be tricky since that means
>>> keeping track of the starter offsets in Kafka topic(s) from which the jobs
>>> started working on.
>>>
>>>
>>>
>>> Ideally, we'd want to avoid a re-launch.  The 'suspension' and
>>> relaunching of jobs, coupled with the wait for the new machines to come
>>> online may turn out quite time-consuming which will make for lengthy
>>> request times, and our requests are not asynchronous.  Ideally, the
>>> currently running jobs would continue to run on the machines currently
>>> available in the cluster.
>>>
>>>
>>>
>>> In the scale-down case, the job manager would want to signal to Spark's
>>> job scheduler not to send work to the node being taken out, find out when
>>> the last job has finished running on the node, then take the node out.
>>>
>>>
>>>
>>> This is somewhat like changing the number of cylinders in a car engine
>>> while the car is running...
>>>
>>>
>>>
>>> Sounds like a great candidate for a set of enhancements in Spark...
>>>
>>>
>>

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Dmitry Goldenberg
If I want to restart my consumers into an updated cluster topology after
the cluster has been expanded or contracted, would I need to call stop() on
them, then call start() on them, or would I need to instantiate and start
new context objects (new JavaStreamingContext(...)) ?  I'm thinking of
actually acquiescing these streaming consumers but letting them finish
their current batch first.

Right now I'm doing

jssc.start();
jssc.awaitTermination();

Must jssc.close() be called as well, after awaitTermination(), to avoid
potentially leaking contexts?  I don't see that in things
like JavaDirectKafkaWordCount but wondering if that's needed.

On Wed, Jun 3, 2015 at 11:49 AM, Evo Eftimov  wrote:

> Makes sense especially if you have a cloud with “infinite” resources /
> nodes which allows you to double, triple etc in the background/parallel the
> resources of the currently running cluster
>
>
>
> I was thinking more about the scenario where you have e.g. 100 boxes and
> want to / can add e.g. 20 more
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:46 PM
> *To:* Evo Eftimov
> *Cc:* Cody Koeninger; Andrew Or; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Evo,
>
>
>
> One of the ideas is to shadow the current cluster. This way there's no
> extra latency incurred due to shutting down of the consumers. If two sets
> of consumers are running, potentially processing the same data, that is OK.
> We phase out the older cluster and gradually flip over to the new one,
> insuring no downtime or extra latency.  Thoughts?
>
>
>
> On Wed, Jun 3, 2015 at 11:27 AM, Evo Eftimov 
> wrote:
>
> You should monitor vital performance / job clogging stats of the Spark
> Streaming Runtime not “kafka topics”
>
>
>
> You should be able to bring new worker nodes online and make them contact
> and register with the Master without bringing down the Master (or any of
> the currently running worker nodes)
>
>
>
> Then just shutdown your currently running spark streaming job/app and
> restart it with new params to take advantage of the larger cluster
>
>
>
> *From:* Dmitry Goldenberg [mailto:dgoldenberg...@gmail.com]
> *Sent:* Wednesday, June 3, 2015 4:14 PM
> *To:* Cody Koeninger
> *Cc:* Andrew Or; Evo Eftimov; Gerard Maas; spark users
> *Subject:* Re: FW: Re: Autoscaling Spark cluster based on topic
> sizes/rate of growth in Kafka or Spark's metrics?
>
>
>
> Would it be possible to implement Spark autoscaling somewhat along these
> lines? --
>
>
>
> 1. If we sense that a new machine is needed, by watching the data load in
> Kafka topic(s), then
>
> 2. Provision a new machine via a Provisioner interface (e.g. talk to AWS
> and get a machine);
>
> 3. Create a "shadow"/mirror Spark master running alongside the initial
> version which talks to N machines. The new mirror version is aware of N+1
> machines (or N+M if we had decided we needed M new boxes).
>
> 4. The previous version of the Spark runtime is
> acquiesced/decommissioned.  We possibly get both clusters working on the
> same data which may actually be OK (at least for our specific use-cases).
>
> 5. Now the new Spark cluster is running.
>
>
>
> Similarly, the decommissioning of M unused boxes would happen, via this
> notion of a mirror Spark runtime.  How feasible would it be for such a
> mirrorlike setup to be created, especially created programmatically?
> Especially point #3.
>
>
>
> The other idea we'd entertained was to bring in a new machine, acquiesce
> down all currently running workers by telling them to process their current
> batch then shut down, then restart the consumers now that Spark is aware of
> a modified cluster.  This has the drawback of a downtime that may not be
> tolerable in terms of latency, by the system's clients waiting for their
> responses in a synchronous fashion.
>
>
>
> Thanks.
>
>
>
> On Thu, May 28, 2015 at 5:15 PM, Cody Koeninger 
> wrote:
>
> I'm not sure that points 1 and 2 really apply to the kafka direct stream.
> There are no receivers, and you know at the driver how big each of your
> batches is.
>
>
>
> On Thu, May 28, 2015 at 2:21 PM, Andrew Or  wrote:
>
> Hi all,
>
>
>
> As the author of the dynamic allocation feature I can offer a few insights
> here.
>
>
>
> Gerard's explanation was both correct and concise: dynamic allocation is
> not intended to be used in Spark streaming at the moment (1.4 or before).
> This is because of two things:

Re: FW: Re: Autoscaling Spark cluster based on topic sizes/rate of growth in Kafka or Spark's metrics?

2015-06-11 Thread Dmitry Goldenberg
; getting used to run tasks, including reading from Kafka (remember, Kafka
> direct approach reads from Kafka like a file system, from any node that
> runs the task). So it will immediately start using the extra resources, no
> need to do anything further.
>
> (b) Receiver: This is definitely tricky. If you dont need to increase the
> number of receivers, then a new executor will start getting used for
> computations (shuffles, writing out, etc.), but the parallelism in
> receiving will not increase. If you need to increase that, then its best to
> shutdown the context gracefully (so that no data is lost), and a new
> StreamingContext can be started with more receivers (# receivers <= #
> executors), and may be more #partitions for shuffles. You have call stop on
> currently running streaming context, to start a new one. If a context is
> stopped, any thread stuck in awaitTermniation will get unblocked.
>
> Does that clarify things?
>
>
>
>
>
>
>
> On Thu, Jun 11, 2015 at 7:30 AM, Cody Koeninger 
> wrote:
>
>> Depends on what you're reusing multiple times (if anything).
>>
>> Read
>> http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence
>>
>> On Wed, Jun 10, 2015 at 12:18 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> At which point would I call cache()?  I just want the runtime to spill
>>> to disk when necessary without me having to know when the "necessary" is.
>>>
>>>
>>> On Thu, Jun 4, 2015 at 9:42 AM, Cody Koeninger 
>>> wrote:
>>>
>>>> direct stream isn't a receiver, it isn't required to cache data
>>>> anywhere unless you want it to.
>>>>
>>>> If you want it, just call cache.
>>>>
>>>> On Thu, Jun 4, 2015 at 8:20 AM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> "set the storage policy for the DStream RDDs to MEMORY AND DISK" - it
>>>>> appears the storage level can be specified in the createStream methods but
>>>>> not createDirectStream...
>>>>>
>>>>>
>>>>> On Thu, May 28, 2015 at 9:05 AM, Evo Eftimov 
>>>>> wrote:
>>>>>
>>>>>> You can also try Dynamic Resource Allocation
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://spark.apache.org/docs/1.3.1/job-scheduling.html#dynamic-resource-allocation
>>>>>>
>>>>>>
>>>>>>
>>>>>> Also re the Feedback Loop for automatic message consumption rate
>>>>>> adjustment – there is a “dumb” solution option – simply set the storage
>>>>>> policy for the DStream RDDs to MEMORY AND DISK – when the memory gets
>>>>>> exhausted spark streaming will resort to keeping new RDDs on disk which
>>>>>> will prevent it from crashing and hence loosing them. Then some memory 
>>>>>> will
>>>>>> get freed and it will resort back to RAM and so on and so forth
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> Sent from Samsung Mobile
>>>>>>
>>>>>>  Original message 
>>>>>>
>>>>>> From: Evo Eftimov
>>>>>>
>>>>>> Date:2015/05/28 13:22 (GMT+00:00)
>>>>>>
>>>>>> To: Dmitry Goldenberg
>>>>>>
>>>>>> Cc: Gerard Maas ,spark users
>>>>>>
>>>>>> Subject: Re: Autoscaling Spark cluster based on topic sizes/rate of
>>>>>> growth in Kafka or Spark's metrics?
>>>>>>
>>>>>>
>>>>>>
>>>>>> You can always spin new boxes in the background and bring them into
>>>>>> the cluster fold when fully operational and time that with job relaunch 
>>>>>> and
>>>>>> param change
>>>>>>
>>>>>>
>>>>>>
>>>>>> Kafka offsets are mabaged automatically for you by the kafka clients
>>>>>> which keep them in zoomeeper dont worry about that ad long as you shut 
>>>>>> down
>>>>>> your job gracefuly. Besides msnaging the offsets explicitly is not a big
>>>>>> deal if necessary
>>>>>>
>>>>>>
&

Re: Registering custom metrics

2015-06-22 Thread Dmitry Goldenberg
Great, thank you, Silvio. In your experience, is there any way to instument
a callback into Coda Hale or the Spark consumers from the metrics sink?  If
the sink performs some steps once it has received the metrics, I'd like to
be able to make the consumers aware of that via some sort of a callback..

On Mon, Jun 22, 2015 at 10:14 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

> Sorry, replied to Gerard’s question vs yours.
>
> See here:
>
> Yes, you have to implement your own custom Metrics Source using the Code
> Hale library. See here for some examples:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/metrics/source/JvmSource.scala
>
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/master/ApplicationSource.scala
>
> The source gets registered, then you have to configure a sink for it just
> as the JSON servlet you mentioned.
>
> I had done it in the past but don’t have the access to the source for that
> project anymore unfortunately.
>
> Thanks,
> Silvio
>
>
>
>
>
>
> On 6/22/15, 9:57 AM, "dgoldenberg"  wrote:
>
> >Hi Gerard,
> >
> >Have there been any responses? Any insights as to what you ended up doing
> to
> >enable custom metrics? I'm thinking of implementing a custom metrics sink,
> >not sure how doable that is yet...
> >
> >Thanks.
> >
> >
> >
> >--
> >View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html
> >Sent from the Apache Spark User List mailing list archive at Nabble.com.
> >
> >-
> >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >For additional commands, e-mail: user-h...@spark.apache.org
> >
>


Re: Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?

2015-06-23 Thread Dmitry Goldenberg
Yes, Akhil. We already have an origination timestamp in the body of the message 
when we send it. But we can't guarantee the network speed nor a precise enough 
synchronization of clocks across machines.

Pulling the timestamp from Kafka itself would be a step forward although the 
broker is most likely going to be not collocated with the consumers, so this 
would still be imprecise.

> On Jun 23, 2015, at 3:46 AM, Akhil Das  wrote:
> 
> May be while producing the messages, you can make it as a keyedMessage with 
> the timestamp as key and on the consumer end you can easily identify the key 
> (which will be the timestamp) from the message. If the network is fast 
> enough, then i think there could be a small millisecond lag.
> 
> Thanks
> Best Regards
> 
>> On Tue, Jun 23, 2015 at 10:22 AM, dgoldenberg  
>> wrote:
>> Is there any way to retrieve the time of each message's arrival into a Kafka
>> topic, when streaming in Spark, whether with receiver-based or direct
>> streaming?
>> 
>> Thanks.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-08 Thread Dmitry Goldenberg
Richard,

That's exactly the strategy I've been trying, which is a wrapper singleton
class. But I was seeing the inner object being created multiple times.

I wonder if the problem has to do with the way I'm processing the RDD's.
I'm using JavaDStream to stream data (from Kafka). Then I'm processing the
RDD's like so

JavaPairInputDStream messages =
KafkaUtils.createDirectStream(...)
JavaDStream messageBodies = messages.map(...)
messageBodies.foreachRDD(new MyFunction());

where MyFunction implements Function, Void> {
  ...
  rdd.map / rdd.filter ...
  rdd.foreach(... perform final action ...)
}

Perhaps the multiple singletons I'm seeing are the per-executor instances?
Judging by the streaming programming guide, perhaps I should follow the
connection sharing example:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
val connection = createNewConnection()
partitionOfRecords.foreach(record => connection.send(record))
connection.close()
  }
}

So I'd pre-create my singletons in the foreachPartition call which would
let them be the per-JVM singletons, to be passed into MyFunction which
would now be a partition processing function rather than an RDD processing
function.

I wonder whether these singletons would still be created on every call as
the master sends RDD data over to the workers ?

I also wonder whether using foreachPartition would be more efficient anyway
and prevent some of the over-network data shuffling effects that I imagine
may happen with just doing a foreachRDD ?











On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher 
wrote:

> Would it be possible to have a wrapper class that just represents a
> reference to a singleton holding the 3rd party object? It could proxy over
> calls to the singleton object which will instantiate a private instance of
> the 3rd party object lazily? I think something like this might work if the
> workers have the singleton object in their classpath.
>
> here's a rough sketch of what I was thinking:
>
> object ThirdPartySingleton {
>   private lazy val thirdPartyObj = ...
>
>   def someProxyFunction() = thirdPartyObj.()
> }
>
> class ThirdPartyReference extends Serializable {
>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
> }
>
> also found this SO post:
> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>
>
> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> I am seeing a lot of posts on singletons vs. broadcast variables, such as
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>
>> What's the best approach to instantiate an object once and have it be
>> reused
>> by the worker(s).
>>
>> E.g. I have an object that loads some static state such as e.g. a
>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>> can't
>> seem to get it to be a singleton on the worker side as the JVM appears to
>> be
>> wiped on every request so I get a new instance.  So the singleton doesn't
>> stick.
>>
>> Is there an approach where I could have this object or a wrapper of it be
>> a
>> broadcast var? Can Kryo get me there? would that basically mean writing a
>> custom serializer?  However, the 3rd party object may have a bunch of
>> member
>> vars hanging off it, so serializing it properly may be non-trivial...
>>
>> Any pointers/hints greatly appreciated.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-08 Thread Dmitry Goldenberg
My singletons do in fact stick around. They're one per worker, looks like.
So with 4 workers running on the box, we're creating one singleton per
worker process/jvm, which seems OK.

Still curious about foreachPartition vs. foreachRDD though...

On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher 
wrote:

> Would it be possible to have a wrapper class that just represents a
> reference to a singleton holding the 3rd party object? It could proxy over
> calls to the singleton object which will instantiate a private instance of
> the 3rd party object lazily? I think something like this might work if the
> workers have the singleton object in their classpath.
>
> here's a rough sketch of what I was thinking:
>
> object ThirdPartySingleton {
>   private lazy val thirdPartyObj = ...
>
>   def someProxyFunction() = thirdPartyObj.()
> }
>
> class ThirdPartyReference extends Serializable {
>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
> }
>
> also found this SO post:
> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>
>
> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg 
> wrote:
>
>> Hi,
>>
>> I am seeing a lot of posts on singletons vs. broadcast variables, such as
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>> *
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>
>> What's the best approach to instantiate an object once and have it be
>> reused
>> by the worker(s).
>>
>> E.g. I have an object that loads some static state such as e.g. a
>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>> can't
>> seem to get it to be a singleton on the worker side as the JVM appears to
>> be
>> wiped on every request so I get a new instance.  So the singleton doesn't
>> stick.
>>
>> Is there an approach where I could have this object or a wrapper of it be
>> a
>> broadcast var? Can Kryo get me there? would that basically mean writing a
>> custom serializer?  However, the 3rd party object may have a bunch of
>> member
>> vars hanging off it, so serializing it properly may be non-trivial...
>>
>> Any pointers/hints greatly appreciated.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: foreachRDD vs. forearchPartition ?

2015-07-08 Thread Dmitry Goldenberg
"These are quite different operations. One operates on RDDs in  DStream and
one operates on partitions of an RDD. They are not alternatives."

Sean, different operations as they are, they can certainly be used on the
same data set.  In that sense, they are alternatives. Code can be written
using one or the other which reaches the same effect - likely at a
different efficiency cost.

The question is, what are the effects of applying one vs. the other?

My specific scenario is, I'm streaming data out of Kafka.  I want to
perform a few transformations then apply an action which results in e.g.
writing this data to Solr.  According to Evo, my best bet is
foreachPartition because of increased parallelism (which I'd need to grok
to understand the details of what that means).

Another scenario is, I've done a few transformations and send a result
somewhere, e.g. I write a message into a socket.  Let's say I have one
socket per a client of my streaming app and I get a host:port of that
socket as part of the message and want to send the response via that
socket.  Is foreachPartition still a better choice?








On Wed, Jul 8, 2015 at 9:51 AM, Sean Owen  wrote:

> These are quite different operations. One operates on RDDs in  DStream and
> one operates on partitions of an RDD. They are not alternatives.
>
> On Wed, Jul 8, 2015, 2:43 PM dgoldenberg  wrote:
>
>> Is there a set of best practices for when to use foreachPartition vs.
>> foreachRDD?
>>
>> Is it generally true that using foreachPartition avoids some of the
>> over-network data shuffling overhead?
>>
>> When would I definitely want to use one method vs. the other?
>>
>> Thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>


Re: foreachRDD vs. forearchPartition ?

2015-07-08 Thread Dmitry Goldenberg
Thanks, Cody. The "good boy" comment wasn't from me :)  I was the one
asking for help.



On Wed, Jul 8, 2015 at 10:52 AM, Cody Koeninger  wrote:

> Sean already answered your question.  foreachRDD and foreachPartition are
> completely different, there's nothing fuzzy or insufficient about that
> answer.  The fact that you can call foreachPartition on an rdd within the
> scope of foreachRDD should tell you that they aren't in any way comparable.
>
> I'm not sure if your rudeness ("be a good boy"...really?) is intentional
> or not.  If you're asking for help from people that are in most cases
> donating their time, I'd suggest that you'll have more success with a
> little more politeness.
>
> On Wed, Jul 8, 2015 at 9:05 AM, Evo Eftimov  wrote:
>
>> That was a) fuzzy b) insufficient – one can certainly use forach (only)
>> on DStream RDDs – it works as empirical observation
>>
>>
>>
>> As another empirical observation:
>>
>>
>>
>> For each partition results in having one instance of the lambda/closure
>> per partition when e.g. publishing to output systems like message brokers,
>> databases and file systems - that increases the level of parallelism of
>> your output processing
>>
>>
>>
>> As an architect I deal with gazillions of products and don’t have time to
>> read the source code of all of them to make up for documentation
>> deficiencies. On the other hand I believe you have been involved in writing
>> some of the code so be a good boy and either answer this question properly
>> or enhance the product documentation of that area of the system
>>
>>
>>
>> *From:* Sean Owen [mailto:so...@cloudera.com]
>> *Sent:* Wednesday, July 8, 2015 2:52 PM
>> *To:* dgoldenberg; user@spark.apache.org
>> *Subject:* Re: foreachRDD vs. forearchPartition ?
>>
>>
>>
>> These are quite different operations. One operates on RDDs in  DStream
>> and one operates on partitions of an RDD. They are not alternatives.
>>
>>
>>
>> On Wed, Jul 8, 2015, 2:43 PM dgoldenberg 
>> wrote:
>>
>> Is there a set of best practices for when to use foreachPartition vs.
>> foreachRDD?
>>
>> Is it generally true that using foreachPartition avoids some of the
>> over-network data shuffling overhead?
>>
>> When would I definitely want to use one method vs. the other?
>>
>> Thanks.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: foreachRDD vs. forearchPartition ?

2015-07-08 Thread Dmitry Goldenberg
Thanks, Sean.

"are you asking about foreach vs foreachPartition? that's quite
different. foreachPartition does not give more parallelism but lets
you operate on a whole batch of data at once, which is nice if you
need to allocate some expensive resource to do the processing"

This is basically what I was looking for.


On Wed, Jul 8, 2015 at 11:15 AM, Sean Owen  wrote:

> @Evo There is no foreachRDD operation on RDDs; it is a method of
> DStream. It gives each RDD in the stream. RDD has a foreach, and
> foreachPartition. These give elements of an RDD. What do you mean it
> 'works' to call foreachRDD on an RDD?
>
> @Dmitry are you asking about foreach vs foreachPartition? that's quite
> different. foreachPartition does not give more parallelism but lets
> you operate on a whole batch of data at once, which is nice if you
> need to allocate some expensive resource to do the processing.
>
> On Wed, Jul 8, 2015 at 3:18 PM, Dmitry Goldenberg
>  wrote:
> > "These are quite different operations. One operates on RDDs in  DStream
> and
> > one operates on partitions of an RDD. They are not alternatives."
> >
> > Sean, different operations as they are, they can certainly be used on the
> > same data set.  In that sense, they are alternatives. Code can be written
> > using one or the other which reaches the same effect - likely at a
> different
> > efficiency cost.
> >
> > The question is, what are the effects of applying one vs. the other?
> >
> > My specific scenario is, I'm streaming data out of Kafka.  I want to
> perform
> > a few transformations then apply an action which results in e.g. writing
> > this data to Solr.  According to Evo, my best bet is foreachPartition
> > because of increased parallelism (which I'd need to grok to understand
> the
> > details of what that means).
> >
> > Another scenario is, I've done a few transformations and send a result
> > somewhere, e.g. I write a message into a socket.  Let's say I have one
> > socket per a client of my streaming app and I get a host:port of that
> socket
> > as part of the message and want to send the response via that socket.  Is
> > foreachPartition still a better choice?
> >
> >
> >
> >
> >
> >
> >
> >
> > On Wed, Jul 8, 2015 at 9:51 AM, Sean Owen  wrote:
> >>
> >> These are quite different operations. One operates on RDDs in  DStream
> and
> >> one operates on partitions of an RDD. They are not alternatives.
> >>
> >>
> >> On Wed, Jul 8, 2015, 2:43 PM dgoldenberg 
> wrote:
> >>>
> >>> Is there a set of best practices for when to use foreachPartition vs.
> >>> foreachRDD?
> >>>
> >>> Is it generally true that using foreachPartition avoids some of the
> >>> over-network data shuffling overhead?
> >>>
> >>> When would I definitely want to use one method vs. the other?
> >>>
> >>> Thanks.
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> http://apache-spark-user-list.1001560.n3.nabble.com/foreachRDD-vs-forearchPartition-tp23714.html
> >>> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
> >>>
> >>> -
> >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> >>> For additional commands, e-mail: user-h...@spark.apache.org
> >>>
> >
>


Re: Best practice for using singletons on workers (seems unanswered) ?

2015-07-08 Thread Dmitry Goldenberg
Richard,
It seems whether I'm doing a foreachRDD or foreachPartition, I'm able to
create per-worker/per-JVM singletons. With 4 workers, I've got 4 singletons
created.  I wouldn't be able to use broadcast vars because the 3rd party
objects are not serializable.

The shuffling effect is basically whenever Spark has to pull data from
multiple machines together over the network when executing an action.
Probably not an issue for foreachRDD, but more for such actions as 'union'
or 'subtract' and the like.

On Wed, Jul 8, 2015 at 3:55 PM, Richard Marscher 
wrote:

> Ah, I see this is streaming. I haven't any practical experience with that
> side of Spark. But the foreachPartition idea is a good approach. I've used
> that pattern extensively, even though not for singletons, but just to
> create non-serializable objects like API and DB clients on the executor
> side. I think it's the most straightforward approach to dealing with any
> non-serializable object you need.
>
> I don't entirely follow what over-network data shuffling effects you are
> alluding to (maybe more specific to streaming?).
>
> On Wed, Jul 8, 2015 at 9:41 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> My singletons do in fact stick around. They're one per worker, looks
>> like.  So with 4 workers running on the box, we're creating one singleton
>> per worker process/jvm, which seems OK.
>>
>> Still curious about foreachPartition vs. foreachRDD though...
>>
>> On Tue, Jul 7, 2015 at 11:27 AM, Richard Marscher <
>> rmarsc...@localytics.com> wrote:
>>
>>> Would it be possible to have a wrapper class that just represents a
>>> reference to a singleton holding the 3rd party object? It could proxy over
>>> calls to the singleton object which will instantiate a private instance of
>>> the 3rd party object lazily? I think something like this might work if the
>>> workers have the singleton object in their classpath.
>>>
>>> here's a rough sketch of what I was thinking:
>>>
>>> object ThirdPartySingleton {
>>>   private lazy val thirdPartyObj = ...
>>>
>>>   def someProxyFunction() = thirdPartyObj.()
>>> }
>>>
>>> class ThirdPartyReference extends Serializable {
>>>   def someProxyFunction() = ThirdPartySingleton.someProxyFunction()
>>> }
>>>
>>> also found this SO post:
>>> http://stackoverflow.com/questions/26369916/what-is-the-right-way-to-have-a-static-object-on-all-workers
>>>
>>>
>>> On Tue, Jul 7, 2015 at 11:04 AM, dgoldenberg 
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> I am seeing a lot of posts on singletons vs. broadcast variables, such
>>>> as
>>>> *
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-way-to-have-some-singleton-per-worker-tt20277.html
>>>> *
>>>>
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-share-a-NonSerializable-variable-among-tasks-in-the-same-worker-node-tt11048.html#a21219
>>>>
>>>> What's the best approach to instantiate an object once and have it be
>>>> reused
>>>> by the worker(s).
>>>>
>>>> E.g. I have an object that loads some static state such as e.g. a
>>>> dictionary/map, is a part of 3rd party API and is not serializable.  I
>>>> can't
>>>> seem to get it to be a singleton on the worker side as the JVM appears
>>>> to be
>>>> wiped on every request so I get a new instance.  So the singleton
>>>> doesn't
>>>> stick.
>>>>
>>>> Is there an approach where I could have this object or a wrapper of it
>>>> be a
>>>> broadcast var? Can Kryo get me there? would that basically mean writing
>>>> a
>>>> custom serializer?  However, the 3rd party object may have a bunch of
>>>> member
>>>> vars hanging off it, so serializing it properly may be non-trivial...
>>>>
>>>> Any pointers/hints greatly appreciated.
>>>>
>>>>
>>>>
>>>> --
>>>> View this message in context:
>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Best-practice-for-using-singletons-on-workers-seems-unanswered-tp23692.html
>>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>>
>>>> -
>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>
>>>>
>>>
>>
>


Re: What is a best practice for passing environment variables to Spark workers?

2015-07-10 Thread Dmitry Goldenberg
Thanks, Akhil.

We're trying the conf.setExecutorEnv() approach since we've already got
environment variables set. For system properties we'd go the
conf.set("spark.") route.

We were concerned that doing the below type of thing did not work, which
this blog post seems to confirm (
http://progexc.blogspot.com/2014/12/spark-configuration-mess-solved.html):

$SPARK_HOME/spark-submit \
  --class "com.acme.Driver" \
  --conf spark.executorEnv.VAR1=VAL1 \
--conf spark.executorEnv.VAR2=VAL2 \
.

The code running on the workers does not see these variables.


On Fri, Jul 10, 2015 at 4:03 AM, Akhil Das 
wrote:

> It basically filters out everything which doesn't starts with "spark
> ."
> so it is necessary to keep spark. in the property name.
>
> Thanks
> Best Regards
>
> On Fri, Jul 10, 2015 at 12:06 AM, dgoldenberg 
> wrote:
>
>> I have about 20 environment variables to pass to my Spark workers. Even
>> though they're in the init scripts on the Linux box, the workers don't see
>> these variables.
>>
>> Does Spark do something to shield itself from what may be defined in the
>> environment?
>>
>> I see multiple pieces of info on how to pass the env vars into workers and
>> they seem dated and/or unclear.
>>
>> Here:
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-pass-config-variables-to-workers-tt5780.html
>>
>> SparkConf conf = new SparkConf();
>> conf.set("spark.myapp.myproperty", "propertyValue");
>> OR
>> set them in spark-defaults.conf, as in
>> spark.config.one value
>> spark.config.two value2
>>
>> In another posting,
>>
>> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-environment-variable-for-a-spark-job-tt3180.html
>> :
>> conf.setExecutorEnv("ORACLE_HOME", myOraHome)
>> conf.setExecutorEnv("SPARK_JAVA_OPTS",
>> "-Djava.library.path=/my/custom/path")
>>
>> The configuration guide talks about
>> "spark.executorEnv.[EnvironmentVariableName] -- Add the environment
>> variable
>> specified by EnvironmentVariableName to the Executor process. The user can
>> specify multiple of these to set multiple environment variables."
>>
>> Then there are mentions of SPARK_JAVA_OPTS which seems to be deprecated
>> (?)
>>
>> What is the easiest/cleanest approach here?  Ideally, I'd not want to
>> burden
>> my driver program with explicit knowledge of all the env vars that are
>> needed on the worker side.  I'd also like to avoid having to jam them into
>> spark-defaults.conf since they're already set in the system init scripts,
>> so
>> why duplicate.
>>
>> I suppose one approach would be to namespace all my vars to start with a
>> well-known prefix, then cycle through the env in the driver and stuff all
>> these variables into the Spark context.  If I'm doing that, would I want
>> to
>>
>> conf.set("spark.myapp.myproperty", "propertyValue");
>>
>> and is "spark." necessary? or was that just part of the example?
>>
>> or would I want to
>>
>> conf.setExecutorEnv("MYPREFIX_MY_VAR_1", "some-value");
>>
>> Thanks.
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-is-a-best-practice-for-passing-environment-variables-to-Spark-workers-tp23751.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Profiling a spark job

2016-04-05 Thread Dmitry Olshansky
Hi list,

I'm curious as to what are the best practices of profiling spark apps? So far I 
tried following this guide with hprof and/or yourkit but the profile looks 
strange:
https://cwiki.apache.org/confluence/display/SPARK/Profiling+Spark+Applications+Using+YourKit

 55% of time spent in EPollWait. However I'm using standalone mode with local 
master without starting separate daemon (could it be that I should?)

---
Dmitry Olshansky
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [discuss] dropping Python 2.6 support

2016-01-10 Thread Dmitry Kniazev
Sasha, it is more complicated than that: many RHEL 6 OS utilities rely on 
Python 2.6. Upgrading it to 2.7 breaks the system. For large enterprises 
migrating to another server OS means re-certifying (re-testing) hundreds of 
applications, so yes, they do prefer to stay where they are until the benefits 
of migrating outweigh the overhead. Long story short: you cannot simply upgrade 
built-in Python 2.6 in RHEL 6 and it will take years for enterprises to migrate 
to RHEL 7.

Having said that, I don't think that it is a problem though, because Python 2.6 
and Python 2.7 can easily co-exist in the same environment. For example, we use 
virtualenv to run Spark with Python 2.7 and do not touch system Python 2.6.

Thank you,
Dmitry

09.01.2016, 06:36, "Sasha Kacanski" :
> +1
> Companies that use stock python in redhat 2.6 will need to upgrade or install 
> fresh version wich is total of 3.5 minutes so no issues ...
>
> On Tue, Jan 5, 2016 at 2:17 AM, Reynold Xin  wrote:
>> Does anybody here care about us dropping support for Python 2.6 in Spark 2.0?
>>
>> Python 2.6 is ancient, and is pretty slow in many aspects (e.g. json 
>> parsing) when compared with Python 2.7. Some libraries that Spark depend on 
>> stopped supporting 2.6. We can still convince the library maintainers to 
>> support 2.6, but it will be extra work. I'm curious if anybody still uses 
>> Python 2.6 to run Spark.
>>
>> Thanks.
>
> --
> Aleksandar Kacanski

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-11 Thread Dmitry Goldenberg
We have a bunch of Spark jobs deployed and a few large resource files such
as e.g. a dictionary for lookups or a statistical model.

Right now, these are deployed as part of the Spark jobs which will
eventually make the mongo-jars too bloated for deployments.

What are some of the best practices to consider for maintaining and sharing
large resource files like these?

Thanks.


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-12 Thread Dmitry Goldenberg
Would it make sense to load them into Tachyon and read and broadcast them from 
there since Tachyon is already a part of the Spark stack?

If so I wonder if I could do that Tachyon read/write via a Spark API?


> On Jan 12, 2016, at 2:21 AM, Sabarish Sasidharan 
>  wrote:
> 
> One option could be to store them as blobs in a cache like Redis and then 
> read + broadcast them from the driver. Or you could store them in HDFS and 
> read + broadcast from the driver.
> 
> Regards
> Sab
> 
>> On Tue, Jan 12, 2016 at 1:44 AM, Dmitry Goldenberg 
>>  wrote:
>> We have a bunch of Spark jobs deployed and a few large resource files such 
>> as e.g. a dictionary for lookups or a statistical model.
>> 
>> Right now, these are deployed as part of the Spark jobs which will 
>> eventually make the mongo-jars too bloated for deployments.
>> 
>> What are some of the best practices to consider for maintaining and sharing 
>> large resource files like these?
>> 
>> Thanks.
> 
> 
> 
> -- 
> 
> Architect - Big Data
> Ph: +91 99805 99458
> 
> Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
> India ICT)
> +++


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-12 Thread Dmitry Goldenberg
Jorn, you said Ignite or ... ? What was the second choice you were thinking of? 
It seems that got omitted.

> On Jan 12, 2016, at 2:44 AM, Jörn Franke  wrote:
> 
> You can look at ignite as a HDFS cache or for  storing rdds. 
> 
>> On 11 Jan 2016, at 21:14, Dmitry Goldenberg  wrote:
>> 
>> We have a bunch of Spark jobs deployed and a few large resource files such 
>> as e.g. a dictionary for lookups or a statistical model.
>> 
>> Right now, these are deployed as part of the Spark jobs which will 
>> eventually make the mongo-jars too bloated for deployments.
>> 
>> What are some of the best practices to consider for maintaining and sharing 
>> large resource files like these?
>> 
>> Thanks.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-12 Thread Dmitry Goldenberg
I'd guess that if the resources are broadcast Spark would put them into 
Tachyon...

> On Jan 12, 2016, at 7:04 AM, Dmitry Goldenberg  
> wrote:
> 
> Would it make sense to load them into Tachyon and read and broadcast them 
> from there since Tachyon is already a part of the Spark stack?
> 
> If so I wonder if I could do that Tachyon read/write via a Spark API?
> 
> 
>> On Jan 12, 2016, at 2:21 AM, Sabarish Sasidharan 
>>  wrote:
>> 
>> One option could be to store them as blobs in a cache like Redis and then 
>> read + broadcast them from the driver. Or you could store them in HDFS and 
>> read + broadcast from the driver.
>> 
>> Regards
>> Sab
>> 
>>> On Tue, Jan 12, 2016 at 1:44 AM, Dmitry Goldenberg 
>>>  wrote:
>>> We have a bunch of Spark jobs deployed and a few large resource files such 
>>> as e.g. a dictionary for lookups or a statistical model.
>>> 
>>> Right now, these are deployed as part of the Spark jobs which will 
>>> eventually make the mongo-jars too bloated for deployments.
>>> 
>>> What are some of the best practices to consider for maintaining and sharing 
>>> large resource files like these?
>>> 
>>> Thanks.
>> 
>> 
>> 
>> -- 
>> 
>> Architect - Big Data
>> Ph: +91 99805 99458
>> 
>> Manthan Systems | Company of the year - Analytics (2014 Frost and Sullivan 
>> India ICT)
>> +++


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-12 Thread Dmitry Goldenberg
Thanks, Gene.

Does Spark use Tachyon under the covers anyway for implementing its
"cluster memory" support?

It seems that the practice I hear the most about is the idea of loading
resources as RDD's and then doing join's against them to achieve the lookup
effect.

The other approach would be to load the resources into broadcast variables
but I've heard concerns about memory.  Could we run out of memory if we
load too much into broadcast vars?  Is there any memory_to_disk/spill to
disk capability for broadcast variables in Spark?


On Tue, Jan 12, 2016 at 11:19 AM, Gene Pang  wrote:

> Hi Dmitry,
>
> Yes, Tachyon can help with your use case. You can read and write to
> Tachyon via the filesystem api (
> http://tachyon-project.org/documentation/File-System-API.html). There is
> a native Java API as well as a Hadoop-compatible API. Spark is also able to
> interact with Tachyon via the Hadoop-compatible API, so Spark jobs can read
> input files from Tachyon and write output files to Tachyon.
>
> I hope that helps,
> Gene
>
> On Tue, Jan 12, 2016 at 4:26 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I'd guess that if the resources are broadcast Spark would put them into
>> Tachyon...
>>
>> On Jan 12, 2016, at 7:04 AM, Dmitry Goldenberg 
>> wrote:
>>
>> Would it make sense to load them into Tachyon and read and broadcast them
>> from there since Tachyon is already a part of the Spark stack?
>>
>> If so I wonder if I could do that Tachyon read/write via a Spark API?
>>
>>
>> On Jan 12, 2016, at 2:21 AM, Sabarish Sasidharan <
>> sabarish.sasidha...@manthan.com> wrote:
>>
>> One option could be to store them as blobs in a cache like Redis and then
>> read + broadcast them from the driver. Or you could store them in HDFS and
>> read + broadcast from the driver.
>>
>> Regards
>> Sab
>>
>> On Tue, Jan 12, 2016 at 1:44 AM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> We have a bunch of Spark jobs deployed and a few large resource files
>>> such as e.g. a dictionary for lookups or a statistical model.
>>>
>>> Right now, these are deployed as part of the Spark jobs which will
>>> eventually make the mongo-jars too bloated for deployments.
>>>
>>> What are some of the best practices to consider for maintaining and
>>> sharing large resource files like these?
>>>
>>> Thanks.
>>>
>>
>>
>>
>> --
>>
>> Architect - Big Data
>> Ph: +91 99805 99458
>>
>> Manthan Systems | *Company of the year - Analytics (2014 Frost and
>> Sullivan India ICT)*
>> +++
>>
>>
>


Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-14 Thread Dmitry Goldenberg
OK so it looks like Tachyon is a cluster memory plugin marked as
"experimental" in Spark.

In any case, we've got a few requirements for the system we're working on
which may drive the decision for how to implement large resource file
management.

The system is a framework of N data analyzers which take incoming documents
as input and transform them or extract some data out of those documents.
These analyzers can be chained together which makes it a great case for
processing with RDD's and a set of map/filter types of Spark functions.
There's already an established framework API which we want to preserve.
This means that most likely, we'll create a relatively thin "binding" layer
for exposing these analyzers as well-documented functions to the end-users
who want to use them in a Spark based distributed computing environment.

We also want to, ideally, hide the complexity of how these resources are
loaded from the end-users who will be writing the actual Spark jobs that
utilize the Spark "binding" functions that we provide.

So, for managing large numbers of small, medium, or large resource files,
we're considering the below options, with a variety of pros and cons
attached, from the following perspectives:

a) persistence - where do the resources reside initially;
b) loading - what are the mechanics for loading of these resources;
c) caching and sharing across worker nodes.

Possible options:

1. Load each resource into a broadcast variable. Considering that we have
scores if not hundreds of these resource files, maintaining that many
broadcast variables seems like a complexity that's going to be hard to
manage. We'd also need a translation layer between the broadcast variables
and the internal API that would want to "speak" InputStream's rather than
broadcast variables.

2. Load resources into RDD's and perform join's against them from our
incoming document data RDD's, thus achieving the effect of a value lookup
from the resources.  While this seems like a very Spark'y way of doing
things, the lookup mechanics seem quite non-trivial, especially because
some of the resources aren't going to be pure dictionaries; they may be
statistical models.  Additionally, this forces us to utilize Spark's
semantics for handling of these resources which means a potential rewrite
of our internal product API. That would be a hard option to go with.

3. Pre-install all the needed resources on each of the worker nodes;
retrieve the needed resources from the file system and load them into
memory as needed. Ideally, the resources would only be installed once, on
the Spark driver side; we'd want to avoid having to pre-install all these
files on each node. However, we've done this as an exercise and this
approach works OK.

4. Pre-load all the resources into HDFS or S3 i.e. into some distributed
persistent store; load them into cluster memory from there, as necessary.
Presumably this could be a pluggable store with a common API exposed.
Since our framework is an OEM'able product, we could plug and play with a
variety of such persistent stores via Java's FileSystem/URL scheme handler
API's.

5. Implement a Resource management server, with a RESTful interface on top.
Under the covers, this could be a wrapper on top of #4.  Potentially
unnecessary if we have a solid persistent store API as per #4.

6. Beyond persistence, caching also has to be considered for these
resources. We've considered Tachyon (especially since it's pluggable into
Spark), Redis, and the like. Ideally, I would think we'd want resources to
be loaded into the cluster memory as needed; paged in/out on-demand in an
LRU fashion.  From this perspective, it's not yet clear to me what the best
option(s) would be. Any thoughts / recommendations would be appreciated.





On Tue, Jan 12, 2016 at 3:04 PM, Dmitry Goldenberg  wrote:

> Thanks, Gene.
>
> Does Spark use Tachyon under the covers anyway for implementing its
> "cluster memory" support?
>
> It seems that the practice I hear the most about is the idea of loading
> resources as RDD's and then doing join's against them to achieve the lookup
> effect.
>
> The other approach would be to load the resources into broadcast variables
> but I've heard concerns about memory.  Could we run out of memory if we
> load too much into broadcast vars?  Is there any memory_to_disk/spill to
> disk capability for broadcast variables in Spark?
>
>
> On Tue, Jan 12, 2016 at 11:19 AM, Gene Pang  wrote:
>
>> Hi Dmitry,
>>
>> Yes, Tachyon can help with your use case. You can read and write to
>> Tachyon via the filesystem api (
>> http://tachyon-project.org/documentation/File-System-API.html). There is
>> a native Java API as well as a Hadoop-compatible API. Spark is al

Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-14 Thread Dmitry Goldenberg
The other thing from some folks' recommendations on this list was Apache
Ignite.  Their In-Memory File System (
https://ignite.apache.org/features/igfs.html) looks quite interesting.

On Thu, Jan 14, 2016 at 7:54 AM, Dmitry Goldenberg  wrote:

> OK so it looks like Tachyon is a cluster memory plugin marked as
> "experimental" in Spark.
>
> In any case, we've got a few requirements for the system we're working on
> which may drive the decision for how to implement large resource file
> management.
>
> The system is a framework of N data analyzers which take incoming
> documents as input and transform them or extract some data out of those
> documents.  These analyzers can be chained together which makes it a great
> case for processing with RDD's and a set of map/filter types of Spark
> functions. There's already an established framework API which we want to
> preserve.  This means that most likely, we'll create a relatively thin
> "binding" layer for exposing these analyzers as well-documented functions
> to the end-users who want to use them in a Spark based distributed
> computing environment.
>
> We also want to, ideally, hide the complexity of how these resources are
> loaded from the end-users who will be writing the actual Spark jobs that
> utilize the Spark "binding" functions that we provide.
>
> So, for managing large numbers of small, medium, or large resource files,
> we're considering the below options, with a variety of pros and cons
> attached, from the following perspectives:
>
> a) persistence - where do the resources reside initially;
> b) loading - what are the mechanics for loading of these resources;
> c) caching and sharing across worker nodes.
>
> Possible options:
>
> 1. Load each resource into a broadcast variable. Considering that we have
> scores if not hundreds of these resource files, maintaining that many
> broadcast variables seems like a complexity that's going to be hard to
> manage. We'd also need a translation layer between the broadcast variables
> and the internal API that would want to "speak" InputStream's rather than
> broadcast variables.
>
> 2. Load resources into RDD's and perform join's against them from our
> incoming document data RDD's, thus achieving the effect of a value lookup
> from the resources.  While this seems like a very Spark'y way of doing
> things, the lookup mechanics seem quite non-trivial, especially because
> some of the resources aren't going to be pure dictionaries; they may be
> statistical models.  Additionally, this forces us to utilize Spark's
> semantics for handling of these resources which means a potential rewrite
> of our internal product API. That would be a hard option to go with.
>
> 3. Pre-install all the needed resources on each of the worker nodes;
> retrieve the needed resources from the file system and load them into
> memory as needed. Ideally, the resources would only be installed once, on
> the Spark driver side; we'd want to avoid having to pre-install all these
> files on each node. However, we've done this as an exercise and this
> approach works OK.
>
> 4. Pre-load all the resources into HDFS or S3 i.e. into some distributed
> persistent store; load them into cluster memory from there, as necessary.
> Presumably this could be a pluggable store with a common API exposed.
> Since our framework is an OEM'able product, we could plug and play with a
> variety of such persistent stores via Java's FileSystem/URL scheme handler
> API's.
>
> 5. Implement a Resource management server, with a RESTful interface on
> top. Under the covers, this could be a wrapper on top of #4.  Potentially
> unnecessary if we have a solid persistent store API as per #4.
>
> 6. Beyond persistence, caching also has to be considered for these
> resources. We've considered Tachyon (especially since it's pluggable into
> Spark), Redis, and the like. Ideally, I would think we'd want resources to
> be loaded into the cluster memory as needed; paged in/out on-demand in an
> LRU fashion.  From this perspective, it's not yet clear to me what the best
> option(s) would be. Any thoughts / recommendations would be appreciated.
>
>
>
>
>
> On Tue, Jan 12, 2016 at 3:04 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Gene.
>>
>> Does Spark use Tachyon under the covers anyway for implementing its
>> "cluster memory" support?
>>
>> It seems that the practice I hear the most about is the idea of loading
>> resources as RDD's and then doing join's against them to achieve the lookup

Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
I apologize for posting this Kafka related issue into the Spark list. Have
gotten no responses on the Kafka list and was hoping someone on this list
could shed some light on the below.


---

We're running into this issue in a clustered environment where we're trying
to send messages to Kafka and are getting the below error.

Can someone explain what might be causing it and what the error message
means (Failed to send data since partitions [,8] don't have a
leader) ?

---

WARN kafka.producer.BrokerPartitionInfo: Error while fetching
metadata partition 10 leader: none replicas: isr: isUnderReplicated: false
for topic partition [,10]: [class
kafka.common.LeaderNotAvailableException]

ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests for
topics  with correlation ids in [2398792,2398801]

ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while
sending a message to the message
store. kafka.common.FailedToSendMessageException: Failed to send messages
after 3 tries.
at
kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.producer.Producer.send(Producer.scala:77)
~[kafka_2.10-0.8.2.0.jar:?]
at kafka.javaapi.producer.Producer.send(Producer.scala:33)
~[kafka_2.10-0.8.2.0.jar:?]

WARN kafka.producer.async.DefaultEventHandler: Failed to send data since
partitions [,8] don't have a leader

What do these errors and warnings mean and how do we get around them?

---

The code for sending messages is basically as follows:

public class KafkaMessageProducer {
private Producer producer;

.

public void sendMessage(String topic, String key,
String message) throws IOException, MessagingException {
KeyedMessage data = new KeyedMessage(topic, key, message);
try {
  producer.send(data);
} catch (Exception ex) {
  throw new MessagingException("Error while sending a message to the
message store.", ex);
}
}

Is it possible that the producer gets "stale" and needs to be
re-initialized?  Do we want to re-create the producer on every message (??)
or is it OK to hold on to one indefinitely?

---

The following are the producer properties that are being set into the
producer

batch.num.messages => 200
client.id => Acme
compression.codec => none
key.serializer.class => kafka.serializer.StringEncoder
message.send.max.retries => 3
metadata.broker.list => data2.acme.com:9092,data3.acme.com:9092
partitioner.class => kafka.producer.DefaultPartitioner
producer.type => sync
queue.buffering.max.messages => 1
queue.buffering.max.ms => 5000
queue.enqueue.timeout.ms => -1
request.required.acks => 1
request.timeout.ms => 1
retry.backoff.ms => 1000
send.buffer.bytes => 102400
serializer.class => kafka.serializer.StringEncoder
topic.metadata.refresh.interval.ms => 60


Thanks.


Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
Adrian,

Thanks for your response. I just looked at both machines we're testing on
and on both the Kafka server process looks OK. Anything specific I can
check otherwise?

>From googling around, I see some posts where folks suggest to check the DNS
settings (those appear fine) and to set the advertised.host.name in Kafka's
server.properties. Yay/nay?

Thanks again.

On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase  wrote:

> I believe some of the brokers in your cluster died and there are a number
> of partitions that nobody is currently managing.
>
> -adrian
>
> From: Dmitry Goldenberg
> Date: Tuesday, September 29, 2015 at 3:26 PM
> To: "user@spark.apache.org"
> Subject: Kafka error "partitions don't have a leader" /
> LeaderNotAvailableException
>
> I apologize for posting this Kafka related issue into the Spark list. Have
> gotten no responses on the Kafka list and was hoping someone on this list
> could shed some light on the below.
>
> 
> ---
>
> We're running into this issue in a clustered environment where we're
> trying to send messages to Kafka and are getting the below error.
>
> Can someone explain what might be causing it and what the error message
> means (Failed to send data since partitions [,8] don't have a
> leader) ?
>
>
> ---
>
> WARN kafka.producer.BrokerPartitionInfo: Error while fetching
> metadata partition 10 leader: none replicas: isr: isUnderReplicated: false
> for topic partition [,10]: [class
> kafka.common.LeaderNotAvailableException]
>
> ERROR kafka.producer.async.DefaultEventHandler: Failed to send requests
> for topics  with correlation ids in [2398792,2398801]
>
> ERROR com.acme.core.messaging.kafka.KafkaMessageProducer: Error while
> sending a message to the message
> store. kafka.common.FailedToSendMessageException: Failed to send messages
> after 3 tries.
> at
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
> ~[kafka_2.10-0.8.2.0.jar:?]
> at kafka.producer.Producer.send(Producer.scala:77)
> ~[kafka_2.10-0.8.2.0.jar:?]
> at kafka.javaapi.producer.Producer.send(Producer.scala:33)
> ~[kafka_2.10-0.8.2.0.jar:?]
>
> WARN kafka.producer.async.DefaultEventHandler: Failed to send data since
> partitions [,8] don't have a leader
>
> What do these errors and warnings mean and how do we get around them?
>
>
> ---
>
> The code for sending messages is basically as follows:
>
> public class KafkaMessageProducer {
> private Producer producer;
>
> .
>
> public void sendMessage(String topic, String key,
> String message) throws IOException, MessagingException {
> KeyedMessage data = new KeyedMessage String>(topic, key, message);
> try {
>   producer.send(data);
> } catch (Exception ex) {
>   throw new MessagingException("Error while sending a message to the
> message store.", ex);
> }
> }
>
> Is it possible that the producer gets "stale" and needs to be
> re-initialized?  Do we want to re-create the producer on every message (??)
> or is it OK to hold on to one indefinitely?
>
>
> ---
>
> The following are the producer properties that are being set into the
> producer
>
> batch.num.messages => 200
> client.id => Acme
> compression.codec => none
> key.serializer.class => kafka.serializer.StringEncoder
> message.send.max.retries => 3
> metadata.broker.list => data2.acme.com:9092,data3.acme.com:9092
> partitioner.class => kafka.producer.DefaultPartitioner
> producer.type => sync
> queue.buffering.max.messages => 1
> queue.buffering.max.ms => 5000
> queue.enqueue.timeout.ms => -1
> request.required.acks => 1
> request.timeout.ms => 1
> retry.backoff.ms => 1000
> send.buffer.bytes => 102400
> serializer.class => kafka.serializer.StringEncoder
> topic.metadata.refresh.interval.ms => 60
>
>
> Thanks.
>


Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
"more partitions and replicas than available brokers" -- what would be a
good ratio?

We've been trying to set up 3 topics with 64 partitions.  I'm including the
output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
topic1" below.

I think it's symptomatic and confirms your theory, Adrian, that we've got
too many partitions. In fact, for topic 2, only 12 partitions appear to
have been created despite the requested 64.  Does Kafka have the limit of
140 partitions total within a cluster?

The doc doesn't appear to have any prescriptions as to how you go about
calculating an optimal number of partitions.

We'll definitely try with fewer, I'm just looking for a good formula to
calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
with something like 12 partitions.  It'd be good to know how high we can go
with that...

Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:

Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1

Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2




Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2

---

Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:

Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2

Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1




Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1

---

Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:

Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2

Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1




Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1


On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:

> The error message is very explicit (partition is under replicated), I
> don’t think it’s related to networking issues.
>
> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
> —describe topic_name and see which brokers are missing from the replica
> assignment.
> *(replace home, zk-quorum etc with your own set-up)*
>
> Lastly, has this ever worked? Maybe you’ve accidentally created the topic
> with more partitions and replicas than available brokers… try to recreate
> with fewer partitions/replicas, see if it works.
>
> -adrian
>
> From: Dmitry Goldenberg
> Date: Tuesday, September 29, 2015 at 3:37 PM
> To: Adrian Tanase
> Cc: "user@spark.apache.org"
> Subject: Re: Kafka error "partitions don't have a leader" /
> LeaderNotAvailableException
>
> Adrian,
>
> Thanks for your response. I just looked at both machines we're testing on
> and on both the Kafka server process looks OK. Anything specific I can
> check otherwise?
>
> From googling around, I see some posts where folks suggest to check the
> DNS settings (those appear fine) and to set the advertised.host.name in
> Kafka's server.properties. Yay/nay?
>
> Thanks again.
>
> On Tue, Sep 29, 2015 at 8:31 AM, Adrian Tanase  wrote:
>
>> I believe some of the brokers in your cluster died and there are a number
>> of partitions that nobody is currently managing.
>>
>> -adrian
>>
>> From: Dmitry Goldenberg
>> Date: Tuesday, September 29, 2015 at 3:26 PM
>> To: "user@spark.apache.org"
>> Subject: Kafka error "partitions don't have a leader" /
>> LeaderNotAvailableException
>>
>> I apologize for posting this Kafka related issue into the Spark list.
>> Have gotten no responses on the Kafka list and was hoping someone on this
>> list could shed some light on the below.
>>
>> 
>> ---
>>
>> We're running into this issue in a clustered environment where we're
>> trying to send messages to Kafka and are getting the below error.
>>
>> Can someone explain what might be causing it and what the error message
>> means (Failed to send data since partitions [,8] don't have a
>> leader) ?
>>
>>
>> ---
>>
>> WARN kafka.producer.BrokerPartitionInfo: Error while fetching
>> metadata partition 10 leader: none replicas: isr: isUnderReplicated: false
>> for topic partition [,10]: [class
>>

Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
Thanks, Cody.

Yes we did see that writeup from Jay, it seems to just refer to his test 6
partitions.  I've been looking for more of a recipe of what the possible
max is vs. what the optimal value may be; haven't found such.

KAFKA-899 appears related but it was fixed in Kafka 0.8.2.0 - we're running
0.8.2.1.

I'm more curious about another error message from the logs which is this:

*fetching topic metadata for topics [Set(my-topic-1)] from broker
[ArrayBuffer(id:0,host:data2.acme.com <http://data2.acme.com>,port:9092,
id:1,host:data3.acme.com <http://data3.acme.com>,port:9092)] failed*

I know that data2 should have broker ID of 1 and data3 should have broker
ID of 2.  So there's some disconnect somewhere as to what these ID's are.
In Zookeeper, ls /brokers/ids lists: [1, 2].  So where could the [0, 1] be
stuck?



On Tue, Sep 29, 2015 at 9:39 AM, Cody Koeninger  wrote:

> Try writing and reading to the topics in question using the kafka command
> line tools, to eliminate your code as a variable.
>
>
> That number of partitions is probably more than sufficient:
>
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> Obviously if you ask for more replicas than you have brokers you're going
> to have a problem, but that doesn't seem to be the case.
>
>
>
> Also, depending on what version of kafka you're using on the broker, you
> may want to look through the kafka jira, e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-899
>
>
> On Tue, Sep 29, 2015 at 8:05 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "more partitions and replicas than available brokers" -- what would be a
>> good ratio?
>>
>> We've been trying to set up 3 topics with 64 partitions.  I'm including
>> the output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
>> topic1" below.
>>
>> I think it's symptomatic and confirms your theory, Adrian, that we've got
>> too many partitions. In fact, for topic 2, only 12 partitions appear to
>> have been created despite the requested 64.  Does Kafka have the limit of
>> 140 partitions total within a cluster?
>>
>> The doc doesn't appear to have any prescriptions as to how you go about
>> calculating an optimal number of partitions.
>>
>> We'll definitely try with fewer, I'm just looking for a good formula to
>> calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
>> with something like 12 partitions.  It'd be good to know how high we can go
>> with that...
>>
>> Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
>>
>> Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> ---
>>
>> Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:
>>
>> Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> ---
>>
>> Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:
>>
>>> The error message is very explicit (partition is under replicated), I
>>> don’t think it’s related to networking issues.
>>>
>>> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
>>> —describe topic_name and see which brokers are missing from the replica
>>> assignment.
>>> *(replace home, zk-quorum etc with your own set-up)*
>>>
>>> Lastly, has this ever worked? Maybe you’ve accidentally created the
>>> topic with mo

Re: Kafka error "partitions don't have a leader" / LeaderNotAvailableException

2015-09-29 Thread Dmitry Goldenberg
We've got Kafka working generally. Can definitely write to it now.

There was a snag where num.partitions was set to 12 on one node but to 64
on the other.  We fixed this and set num.partitions to 42 and things are
working on that side.






On Tue, Sep 29, 2015 at 9:39 AM, Cody Koeninger  wrote:

> Try writing and reading to the topics in question using the kafka command
> line tools, to eliminate your code as a variable.
>
>
> That number of partitions is probably more than sufficient:
>
>
> https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines
>
> Obviously if you ask for more replicas than you have brokers you're going
> to have a problem, but that doesn't seem to be the case.
>
>
>
> Also, depending on what version of kafka you're using on the broker, you
> may want to look through the kafka jira, e.g.
>
> https://issues.apache.org/jira/browse/KAFKA-899
>
>
> On Tue, Sep 29, 2015 at 8:05 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "more partitions and replicas than available brokers" -- what would be a
>> good ratio?
>>
>> We've been trying to set up 3 topics with 64 partitions.  I'm including
>> the output of "bin/kafka-topics.sh --zookeeper localhost:2181 --describe
>> topic1" below.
>>
>> I think it's symptomatic and confirms your theory, Adrian, that we've got
>> too many partitions. In fact, for topic 2, only 12 partitions appear to
>> have been created despite the requested 64.  Does Kafka have the limit of
>> 140 partitions total within a cluster?
>>
>> The doc doesn't appear to have any prescriptions as to how you go about
>> calculating an optimal number of partitions.
>>
>> We'll definitely try with fewer, I'm just looking for a good formula to
>> calculate how many. And no, Adrian, this hasn't worked yet, so we'll start
>> with something like 12 partitions.  It'd be good to know how high we can go
>> with that...
>>
>> Topic:topic1 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
>>
>> Topic: topic2 Partition: 1 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 2 Replicas: 2 Isr: 2
>>
>>
>> ---
>>
>> Topic:topic2 PartitionCount:12 ReplicationFactor:1 Configs:
>>
>> Topic: topic2 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic2 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic2 Partition: 11 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> ---
>>
>> Topic:topic3 PartitionCount:64 ReplicationFactor:1 Configs:
>>
>> Topic: topic3 Partition: 0 Leader: 2 Replicas: 2 Isr: 2
>>
>> Topic: topic3 Partition: 1 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> 
>>
>> Topic: topic3 Partition: 63 Leader: 1 Replicas: 1 Isr: 1
>>
>>
>> On Tue, Sep 29, 2015 at 8:47 AM, Adrian Tanase  wrote:
>>
>>> The error message is very explicit (partition is under replicated), I
>>> don’t think it’s related to networking issues.
>>>
>>> Try to run /home/kafka/bin/kafka-topics.sh —zookeeper localhost/kafka
>>> —describe topic_name and see which brokers are missing from the replica
>>> assignment.
>>> *(replace home, zk-quorum etc with your own set-up)*
>>>
>>> Lastly, has this ever worked? Maybe you’ve accidentally created the
>>> topic with more partitions and replicas than available brokers… try to
>>> recreate with fewer partitions/replicas, see if it works.
>>>
>>> -adrian
>>>
>>> From: Dmitry Goldenberg
>>> Date: Tuesday, September 29, 2015 at 3:37 PM
>>> To: Adrian Tanase
>>> Cc: "user@spark.apache.org"
>>> Subject: Re: Kafka error "partitions don't have a leader" /
>>> LeaderNotAvailableException
>>>
>>> Adrian,
>>>
>>> Thanks for your response. I just looked at both machines we're testing
>&

ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
We're seeing this occasionally. Granted, this was caused by a wrinkle in
the Solr schema but this bubbled up all the way in Spark and caused job
failures.

I just checked and SolrException class is actually in the consumer job jar
we use.  Is there any reason why Spark cannot find the SolrException class?

15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception could
not be deserialized
java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply$mcV$sp(TaskResultGetter.scala:108)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3$$anonfun$run$2.apply(TaskResultGetter.scala:105)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1699)
at
org.apache.spark.scheduler.TaskResultGetter$$anon$3.run(TaskResultGetter.scala:105)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
Release of Spark: 1.5.0.

Command line invokation:

ACME_INGEST_HOME=/mnt/acme/acme-ingest
ACME_INGEST_VERSION=0.0.1-SNAPSHOT
ACME_BATCH_DURATION_MILLIS=5000
SPARK_MASTER_URL=spark://data1:7077
JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"

$SPARK_HOME/bin/spark-submit \
--driver-class-path  $ACME_INGEST_HOME \
--driver-java-options "$JAVA_OPTIONS" \
--class "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
--master $SPARK_MASTER_URL  \
--conf
"spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
\

$ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
-brokerlist $METADATA_BROKER_LIST \
-topic acme.topic1 \
-autooffsetreset largest \
-batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
-appname Acme.App1 \
-checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
Note that SolrException is definitely in our consumer jar
acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
$ACME_INGEST_HOME.

For the extraClassPath on the executors, we've got additionally
hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
Spark jobs to communicate with HBase.  The only way to force Phoenix to
successfully communicate with HBase was to have that JAR explicitly added
to the executor classpath regardless of the fact that the contents of the
hbase-protocol hadoop jar get rolled up into the consumer jar at build time.

I'm starting to wonder whether there's some class loading pattern here
where some classes may not get loaded out of the consumer jar and therefore
have to have their respective jars added to the executor extraClassPath?

Or is this a serialization problem for SolrException as Divya Ravichandran
suggested?




On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:

> Mind providing a bit more information:
>
> release of Spark
> command line for running Spark job
>
> Cheers
>
> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're seeing this occasionally. Granted, this was caused by a wrinkle in
>> the Solr schema but this bubbled up all the way in Spark and caused job
>> failures.
>>
>> I just checked and SolrException class is actually in the consumer job
>> jar we use.  Is there any reason why Spark cannot find the SolrException
>> class?
>>
>> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
>> could not be deserialized
>> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>> at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>> at
>> org.apache.spark.ThrowableSerializationWrapper.readObject(TaskEndReason.scala:163)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:497)
>> at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
>> at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.

Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
Ted, I think I have tried these settings with the hbase protocol jar, to no
avail.

I'm going to see if I can try and use these with this SolrException issue
though it now may be harder to reproduce it. Thanks for the suggestion.

On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:

> Have you tried the following ?
> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
> userClassPathFirst=true
>
> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Release of Spark: 1.5.0.
>>
>> Command line invokation:
>>
>> ACME_INGEST_HOME=/mnt/acme/acme-ingest
>> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
>> ACME_BATCH_DURATION_MILLIS=5000
>> SPARK_MASTER_URL=spark://data1:7077
>> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
>> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>>
>> $SPARK_HOME/bin/spark-submit \
>> --driver-class-path  $ACME_INGEST_HOME \
>> --driver-java-options "$JAVA_OPTIONS" \
>> --class "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver"
>> \
>> --master $SPARK_MASTER_URL  \
>> --conf
>> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
>> \
>>
>> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
>> -brokerlist $METADATA_BROKER_LIST \
>> -topic acme.topic1 \
>> -autooffsetreset largest \
>> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
>> -appname Acme.App1 \
>> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
>> Note that SolrException is definitely in our consumer jar
>> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
>> $ACME_INGEST_HOME.
>>
>> For the extraClassPath on the executors, we've got additionally
>> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
>> Spark jobs to communicate with HBase.  The only way to force Phoenix to
>> successfully communicate with HBase was to have that JAR explicitly added
>> to the executor classpath regardless of the fact that the contents of the
>> hbase-protocol hadoop jar get rolled up into the consumer jar at build time.
>>
>> I'm starting to wonder whether there's some class loading pattern here
>> where some classes may not get loaded out of the consumer jar and therefore
>> have to have their respective jars added to the executor extraClassPath?
>>
>> Or is this a serialization problem for SolrException as Divya
>> Ravichandran suggested?
>>
>>
>>
>>
>> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>>
>>> Mind providing a bit more information:
>>>
>>> release of Spark
>>> command line for running Spark job
>>>
>>> Cheers
>>>
>>> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> We're seeing this occasionally. Granted, this was caused by a wrinkle
>>>> in the Solr schema but this bubbled up all the way in Spark and caused job
>>>> failures.
>>>>
>>>> I just checked and SolrException class is actually in the consumer job
>>>> jar we use.  Is there any reason why Spark cannot find the SolrException
>>>> class?
>>>>
>>>> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
>>>> could not be deserialized
>>>> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Class.java:348)
>>>> at
>>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
>>>> at
>>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>>>> at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>>>> at
>>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>>> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>

Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-29 Thread Dmitry Goldenberg
I'm actually not sure how either one of these would possibly cause Spark to
find SolrException. Whether the driver or executor class path is first,
should it not matter, if the class is in the consumer job jar?




On Tue, Sep 29, 2015 at 9:12 PM, Dmitry Goldenberg  wrote:

> Ted, I think I have tried these settings with the hbase protocol jar, to
> no avail.
>
> I'm going to see if I can try and use these with this SolrException issue
> though it now may be harder to reproduce it. Thanks for the suggestion.
>
> On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:
>
>> Have you tried the following ?
>> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
>> userClassPathFirst=true
>>
>> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Release of Spark: 1.5.0.
>>>
>>> Command line invokation:
>>>
>>> ACME_INGEST_HOME=/mnt/acme/acme-ingest
>>> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
>>> ACME_BATCH_DURATION_MILLIS=5000
>>> SPARK_MASTER_URL=spark://data1:7077
>>> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
>>> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>>>
>>> $SPARK_HOME/bin/spark-submit \
>>> --driver-class-path  $ACME_INGEST_HOME \
>>> --driver-java-options "$JAVA_OPTIONS" \
>>> --class
>>> "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
>>> --master $SPARK_MASTER_URL  \
>>> --conf
>>> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
>>> \
>>>
>>> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
>>> -brokerlist $METADATA_BROKER_LIST \
>>> -topic acme.topic1 \
>>> -autooffsetreset largest \
>>> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
>>> -appname Acme.App1 \
>>> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
>>> Note that SolrException is definitely in our consumer jar
>>> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
>>> $ACME_INGEST_HOME.
>>>
>>> For the extraClassPath on the executors, we've got additionally
>>> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
>>> Spark jobs to communicate with HBase.  The only way to force Phoenix to
>>> successfully communicate with HBase was to have that JAR explicitly added
>>> to the executor classpath regardless of the fact that the contents of the
>>> hbase-protocol hadoop jar get rolled up into the consumer jar at build time.
>>>
>>> I'm starting to wonder whether there's some class loading pattern here
>>> where some classes may not get loaded out of the consumer jar and therefore
>>> have to have their respective jars added to the executor extraClassPath?
>>>
>>> Or is this a serialization problem for SolrException as Divya
>>> Ravichandran suggested?
>>>
>>>
>>>
>>>
>>> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>>>
>>>> Mind providing a bit more information:
>>>>
>>>> release of Spark
>>>> command line for running Spark job
>>>>
>>>> Cheers
>>>>
>>>> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
>>>> dgoldenberg...@gmail.com> wrote:
>>>>
>>>>> We're seeing this occasionally. Granted, this was caused by a wrinkle
>>>>> in the Solr schema but this bubbled up all the way in Spark and caused job
>>>>> failures.
>>>>>
>>>>> I just checked and SolrException class is actually in the consumer job
>>>>> jar we use.  Is there any reason why Spark cannot find the SolrException
>>>>> class?
>>>>>
>>>>> 15/09/29 15:41:58 WARN ThrowableSerializationWrapper: Task exception
>>>>> could not be deserialized
>>>>> java.lang.ClassNotFoundException: org.apache.solr.common.SolrException
>>>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>>>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>>>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>>>>

How to tell Spark not to use /tmp for snappy-unknown-***-libsnappyjava.so

2015-09-30 Thread Dmitry Goldenberg
Is there a way to ensure Spark doesn't write to /tmp directory?

We've got spark.local.dir specified in the spark-defaults.conf file to
point at another directory.  But we're seeing many of
these snappy-unknown-***-libsnappyjava.so files being written to /tmp still.

Is there a config setting or something that would cause Spark to use
another directory of our choosing?

Thanks.


Re: How to tell Spark not to use /tmp for snappy-unknown-***-libsnappyjava.so

2015-09-30 Thread Dmitry Goldenberg
Thanks, Ted, will try it out.

On Wed, Sep 30, 2015 at 9:07 AM, Ted Yu  wrote:

> See the tail of this:
> https://bugzilla.redhat.com/show_bug.cgi?id=1005811
>
> FYI
>
> > On Sep 30, 2015, at 5:54 AM, Dmitry Goldenberg 
> wrote:
> >
> > Is there a way to ensure Spark doesn't write to /tmp directory?
> >
> > We've got spark.local.dir specified in the spark-defaults.conf file to
> point at another directory.  But we're seeing many of these
> snappy-unknown-***-libsnappyjava.so files being written to /tmp still.
> >
> > Is there a config setting or something that would cause Spark to use
> another directory of our choosing?
> >
> > Thanks.
>


Re: ThrowableSerializationWrapper: Task exception could not be deserialized / ClassNotFoundException: org.apache.solr.common.SolrException

2015-09-30 Thread Dmitry Goldenberg
I believe I've had trouble with --conf spark.driver.userClassPathFirst=true
--conf spark.executor.userClassPathFirst=true before, so these might not
work...

I was thinking of trying to add the solr4j jar to
spark.executor.extraClassPath...

On Wed, Sep 30, 2015 at 12:01 PM, Ted Yu  wrote:

> bq. have tried these settings with the hbase protocol jar, to no avail
>
> In that case, HBaseZeroCopyByteString is contained in hbase-protocol.jar.
> In HBaseZeroCopyByteString , you can see:
>
> package com.google.protobuf;  // This is a lie.
>
> If protobuf jar is loaded ahead of hbase-protocol.jar, things start to get
> interesting ...
>
> On Tue, Sep 29, 2015 at 6:12 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Ted, I think I have tried these settings with the hbase protocol jar, to
>> no avail.
>>
>> I'm going to see if I can try and use these with this SolrException issue
>> though it now may be harder to reproduce it. Thanks for the suggestion.
>>
>> On Tue, Sep 29, 2015 at 8:03 PM, Ted Yu  wrote:
>>
>>> Have you tried the following ?
>>> --conf spark.driver.userClassPathFirst=true --conf spark.executor.
>>> userClassPathFirst=true
>>>
>>> On Tue, Sep 29, 2015 at 4:38 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Release of Spark: 1.5.0.
>>>>
>>>> Command line invokation:
>>>>
>>>> ACME_INGEST_HOME=/mnt/acme/acme-ingest
>>>> ACME_INGEST_VERSION=0.0.1-SNAPSHOT
>>>> ACME_BATCH_DURATION_MILLIS=5000
>>>> SPARK_MASTER_URL=spark://data1:7077
>>>> JAVA_OPTIONS="-Dspark.streaming.kafka.maxRatePerPartition=1000"
>>>> JAVA_OPTIONS="$JAVA_OPTIONS -Dspark.executor.memory=2g"
>>>>
>>>> $SPARK_HOME/bin/spark-submit \
>>>> --driver-class-path  $ACME_INGEST_HOME \
>>>> --driver-java-options "$JAVA_OPTIONS" \
>>>> --class
>>>> "com.acme.consumer.kafka.spark.KafkaSparkStreamingDriver" \
>>>> --master $SPARK_MASTER_URL  \
>>>> --conf
>>>> "spark.executor.extraClassPath=$ACME_INGEST_HOME/conf:$ACME_INGEST_HOME/lib/hbase-protocol-0.98.9-hadoop2.jar"
>>>> \
>>>>
>>>> $ACME_INGEST_HOME/lib/acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar \
>>>> -brokerlist $METADATA_BROKER_LIST \
>>>> -topic acme.topic1 \
>>>> -autooffsetreset largest \
>>>> -batchdurationmillis $ACME_BATCH_DURATION_MILLIS \
>>>> -appname Acme.App1 \
>>>> -checkpointdir file://$SPARK_HOME/acme/checkpoint-acme-app1
>>>> Note that SolrException is definitely in our consumer jar
>>>> acme-ingest-kafka-spark-$ACME_INGEST_VERSION.jar which gets deployed to
>>>> $ACME_INGEST_HOME.
>>>>
>>>> For the extraClassPath on the executors, we've got additionally
>>>> hbase-protocol-0.98.9-hadoop2.jar: we're using Apache Phoenix from the
>>>> Spark jobs to communicate with HBase.  The only way to force Phoenix to
>>>> successfully communicate with HBase was to have that JAR explicitly added
>>>> to the executor classpath regardless of the fact that the contents of the
>>>> hbase-protocol hadoop jar get rolled up into the consumer jar at build 
>>>> time.
>>>>
>>>> I'm starting to wonder whether there's some class loading pattern here
>>>> where some classes may not get loaded out of the consumer jar and therefore
>>>> have to have their respective jars added to the executor extraClassPath?
>>>>
>>>> Or is this a serialization problem for SolrException as Divya
>>>> Ravichandran suggested?
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Sep 29, 2015 at 6:16 PM, Ted Yu  wrote:
>>>>
>>>>> Mind providing a bit more information:
>>>>>
>>>>> release of Spark
>>>>> command line for running Spark job
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Tue, Sep 29, 2015 at 1:37 PM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> We're seeing this occasionally. Granted, this was caused by a wrinkle
>>>>>> in the Solr schema but this bubbled up all the way in Spark and caused 
>>

Re: Broadcast var is null

2015-10-05 Thread Dmitry Pristin
Hi guys,
thanks a lot for responding so quickly!

I've reduced the code to the code below - no streaming, no Kafka, no
checkpoint. Unfortunately the end result is the same. Any suggestion to
where I'm messing up would be very much appreciated !

object BroadcastTest extends App {
  val logger = LoggerFactory.getLogger("OinkSparkMain")
  logger.info("OinkSparkMain - Setup Logger")

  val sparkConf = new SparkConf().setAppName("OinkSparkMain")
  val sc : SparkContext = new SparkContext(sparkConf)

  val rdd = sc.parallelize(Array(1,2,3));

  val arr = Array(1, 2, 3)
  val broadcastVar = sc.broadcast(arr)

  val mappedEvents =  rdd.map(e => {
val l = LoggerFactory.getLogger("OinkSparkMain1")

if (broadcastVar == null) {
  l.info("broadcastVar is null")
  (e, "empty")
}
else {
  val str = broadcastVar.value.mkString(" | ")
  l.info("broadcastVar is " + str)
  (e, str)
}
  })

  logger.info("** Total reduced count: " + mappedEvents.collect().length)
}


On Mon, Oct 5, 2015 at 4:14 PM, Adrian Tanase  wrote:

> FYI the same happens with accumulators when recovering from checkpoint.
> I'd love to see this fixed somehow as the workaround (using a singleton
> factory in foreachRdd to make sure the accumulators are initialized instead
> of null) is really intrusive...
>
> Sent from my iPhone
>
> On 05 Oct 2015, at 22:52, Tathagata Das  wrote:
>
> Make sure the broadcast variable works independent of the streaming
> application. Then make sure it work without have
> StreamingContext.getOrCreate(). That will disambiguate whether that error
> is thrown when starting a new context, or when recovering a context from
> checkpoint (as getOrCreate is supposed to do).
>
> On Mon, Oct 5, 2015 at 9:23 AM, dpristin  wrote:
>
>> Hi,
>>
>> Can anyone point me out to what I'm doing wrong? I've implemented a very
>> basic spark streaming app that uses a single broadcast variable. When it
>> runs locally it produces a proper output (the array I broadcast). But when
>> deployed on the cluster I get "broadcastVar is null". We use v 1.4.1. Here
>> is the code:
>>
>> --- imports go here
>>
>> object BroadcastTest extends App {
>>   val logger = LoggerFactory.getLogger("OinkSparkMain")
>>   logger.info("OinkSparkMain - Setup Logger")
>>
>> // This is our custom context setup code; nothing fancy goes on here
>>   val config = Configuration(args)
>>   val ssc: StreamingContext =
>> StreamingContext.getOrCreate(config.checkpointDirectory, () => {
>> SparkStreamingContextFactory.Create(config, timeWindow = Seconds(10))})
>>
>>
>>   val kafkaStreamFactory = new KafkaStreamFactory(config, ssc)
>>   val messages = kafkaStreamFactory.Create
>>
>>   // Grab the value data above kafka input dstream as a string
>>   val events = messages.map( s => s._2 )
>>
>>   //Create a broadcast variable - straight from the dev guide
>>   val broadcastVar = ssc.sparkContext.broadcast(Array(1, 2, 3))
>>
>>   //Try to print out the value of the broadcast var here
>>   val transformed = events.transform(rdd => {
>> rdd.map(x => {
>>   if(broadcastVar == null) {
>> println("broadcastVar is null")
>>   }  else {
>> println("broadcastVar value: " + broadcastVar.value.mkString("|"))
>>   }
>>   x
>> })
>>   })
>>
>>   transformed.foreachRDD(x => logger.info("Data: " +
>> x.collect.mkString("|")))
>>
>>   ssc.start()
>>   ssc.awaitTermination()
>> }
>>
>> Any input is very much appreciated!
>>
>> Regards,
>> Dmitry.
>>
>>
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Broadcast-var-is-null-tp24927.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com
>> <http://nabble.com>.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


What are the .snapshot files in /home/spark/Snapshots?

2015-11-10 Thread Dmitry Goldenberg
We're seeing a bunch of .snapshot files being created under
/home/spark/Snapshots, such as the following for example:

CoarseGrainedExecutorBackend-2015-08-27-shutdown.snapshot
CoarseGrainedExecutorBackend-2015-08-31-shutdown-1.snapshot
SparkSubmit-2015-08-31-shutdown-1.snapshot
Worker-2015-08-27-shutdown.snapshot

These files are large and they blow out our disk space in some environments.

What are these, when are they created and for what purpose?  Is there a way
to control how they're generated and most importantly where they're stored?

Thanks.


Re: What are the .snapshot files in /home/spark/Snapshots?

2015-11-10 Thread Dmitry Goldenberg
N/m, these are just profiling snapshots :) Sorry for the wide distribution.

On Tue, Nov 10, 2015 at 9:46 AM, Dmitry Goldenberg  wrote:

> We're seeing a bunch of .snapshot files being created under
> /home/spark/Snapshots, such as the following for example:
>
> CoarseGrainedExecutorBackend-2015-08-27-shutdown.snapshot
> CoarseGrainedExecutorBackend-2015-08-31-shutdown-1.snapshot
> SparkSubmit-2015-08-31-shutdown-1.snapshot
> Worker-2015-08-27-shutdown.snapshot
>
> These files are large and they blow out our disk space in some
> environments.
>
> What are these, when are they created and for what purpose?  Is there a
> way to control how they're generated and most importantly where they're
> stored?
>
> Thanks.
>


[no subject]

2015-11-26 Thread Dmitry Tolpeko



Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
I've instrumented checkpointing per the programming guide and I can tell
that Spark Streaming is creating the checkpoint directories but I'm not
seeing any content being created in those directories nor am I seeing the
effects I'd expect from checkpointing.  I'd expect any data that comes into
Kafka while the consumers are down, to get picked up when the consumers are
restarted; I'm not seeing that.

For now my checkpoint directory is set to the local file system with the
directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
subdirectory named with a UUID being created under there but no files.

I'm using a custom JavaStreamingContextFactory which creates a
JavaStreamingContext with the directory set into it via the
checkpoint(String) method.

I'm currently not invoking the checkpoint(Duration) method on the DStream
since I want to first rely on Spark's default checkpointing interval.  My
streaming batch duration millis is set to 1 second.

Anyone have any idea what might be going wrong?

Also, at which point does Spark delete files from checkpointing?

Thanks.


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
I'll check the log info message..

Meanwhile, the code is basically

public class KafkaSparkStreamingDriver implements Serializable {

..

SparkConf sparkConf = createSparkConf(appName, kahunaEnv);

JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);


jssc.start();

jssc.awaitTermination();

jssc.close();

..

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {

JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{

  @Override

  public JavaStreamingContext create() {

return createContext(sparkConf, params);

  }

};

return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);

  }

...

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {

// Create context with the specified batch interval, in milliseconds.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));

// Set the checkpoint directory, if we're checkpointing

if (params.isCheckpointed()) {

  jssc.checkpoint(params.getCheckpointDir());

}


Set topicsSet = new HashSet(Arrays.asList(params
.getTopic()));


// Set the Kafka parameters.

Map kafkaParams = new HashMap();

kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());

if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {

  kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());

}


// Create direct Kafka stream with the brokers and the topic.

JavaPairInputDStream messages =
KafkaUtils.createDirectStream(

  jssc,

  String.class,

  String.class,

  StringDecoder.class,

  StringDecoder.class,

  kafkaParams,

  topicsSet);

// See if there's an override of the default checkpoint duration.

if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {

  messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));

}

.




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:

> If you've set the checkpoint dir, it seems like indeed the intent is
> to use a default checkpoint interval in DStream:
>
> private[streaming] def initialize(time: Time) {
> ...
>   // Set the checkpoint interval to be slideDuration or 10 seconds,
> which ever is larger
>   if (mustCheckpoint && checkpointDuration == null) {
> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
> slideDuration).toInt
> logInfo("Checkpoint interval automatically set to " +
> checkpointDuration)
>   }
>
> Do you see that log message? what's the interval? that could at least
> explain why it's not doing anything, if it's quite long.
>
> It sort of seems wrong though since
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> suggests it was intended to be a multiple of the batch interval. The
> slide duration wouldn't always be relevant anyway.
>
> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>  wrote:
> > I've instrumented checkpointing per the programming guide and I can tell
> > that Spark Streaming is creating the checkpoint directories but I'm not
> > seeing any content being created in those directories nor am I seeing the
> > effects I'd expect from checkpointing.  I'd expect any data that comes
> into
> > Kafka while the consumers are down, to get picked up when the consumers
> are
> > restarted; I'm not seeing that.
> >
> > For now my checkpoint directory is set to the local file system with the
> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
> > subdirectory named with a UUID being created under there but no files.
> >
> > I'm using a custom JavaStreamingContextFactory which creates a
> > JavaStreamingContext with the directory set into it via the
> > checkpoint(String) method.
> >
> > I'm currently not invoking the checkpoint(Duration) method on the DStream
> > since I want to first rely on Spark's default checkpointing interval.  My
> > streaming batch duration millis is set to 1 second.
> >
> > Anyone have any idea what might be going wrong?
> >
> > Also, at which point does Spark delete files from checkpointing?
> >
> > Thanks.
>


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-07-31 Thread Dmitry Goldenberg
It looks like there's an issue with the 'Parameters' pojo I'm using within
my driver program. For some reason that needs to be serializable, which is
odd.

java.io.NotSerializableException: com.kona.consumer.kafka.spark.Parameters


Giving it another whirl though having to make it serializable seems odd to
me..

On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:

> If you've set the checkpoint dir, it seems like indeed the intent is
> to use a default checkpoint interval in DStream:
>
> private[streaming] def initialize(time: Time) {
> ...
>   // Set the checkpoint interval to be slideDuration or 10 seconds,
> which ever is larger
>   if (mustCheckpoint && checkpointDuration == null) {
> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
> slideDuration).toInt
> logInfo("Checkpoint interval automatically set to " +
> checkpointDuration)
>   }
>
> Do you see that log message? what's the interval? that could at least
> explain why it's not doing anything, if it's quite long.
>
> It sort of seems wrong though since
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> suggests it was intended to be a multiple of the batch interval. The
> slide duration wouldn't always be relevant anyway.
>
> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>  wrote:
> > I've instrumented checkpointing per the programming guide and I can tell
> > that Spark Streaming is creating the checkpoint directories but I'm not
> > seeing any content being created in those directories nor am I seeing the
> > effects I'd expect from checkpointing.  I'd expect any data that comes
> into
> > Kafka while the consumers are down, to get picked up when the consumers
> are
> > restarted; I'm not seeing that.
> >
> > For now my checkpoint directory is set to the local file system with the
> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
> > subdirectory named with a UUID being created under there but no files.
> >
> > I'm using a custom JavaStreamingContextFactory which creates a
> > JavaStreamingContext with the directory set into it via the
> > checkpoint(String) method.
> >
> > I'm currently not invoking the checkpoint(Duration) method on the DStream
> > since I want to first rely on Spark's default checkpointing interval.  My
> > streaming batch duration millis is set to 1 second.
> >
> > Anyone have any idea what might be going wrong?
> >
> > Also, at which point does Spark delete files from checkpointing?
> >
> > Thanks.
>


How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
We're getting the below error.  Tried increasing spark.executor.memory e.g.
from 1g to 2g but the below error still happens.

Any recommendations? Something to do with specifying -Xmx in the submit job
scripts?

Thanks.

Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit
exceeded
at java.util.Arrays.copyOf(Arrays.java:3332)
at
java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
at
java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
at java.lang.StringBuilder.append(StringBuilder.java:136)
at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
at
org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
at
org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
at
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
at
org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
at
org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
at org.apache.spark.rdd.RDD.(RDD.scala:1365)
at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
at
org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Thanks, Cody, will try that. Unfortunately due to a reinstall I don't have
the original checkpointing directory :(  Thanks for the clarification on
spark.driver.memory, I'll keep testing (at 2g things seem OK for now).

On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger  wrote:

> That looks like it's during recovery from a checkpoint, so it'd be driver
> memory not executor memory.
>
> How big is the checkpoint directory that you're trying to restore from?
>
> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> We're getting the below error.  Tried increasing spark.executor.memory
>> e.g. from 1g to 2g but the below error still happens.
>>
>> Any recommendations? Something to do with specifying -Xmx in the submit
>> job scripts?
>>
>> Thanks.
>>
>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead limit
>> exceeded
>> at java.util.Arrays.copyOf(Arrays.java:3332)
>> at
>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>> at
>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>> at
>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>> at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at
>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>> at scala.Option.getOrElse(Option.scala:120)
>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>> at
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>> at scala.collection.immutable.List.foreach(List.scala:318)
>> at
>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>> at
>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>>
>>
>>
>>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Would there be a way to chunk up/batch up the contents of the checkpointing
directories as they're being processed by Spark Streaming?  Is it mandatory
to load the whole thing in one go?

On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:

> I wonder during recovery from a checkpoint whether we can estimate the
> size of the checkpoint and compare with Runtime.getRuntime().freeMemory().
>
> If the size of checkpoint is much bigger than free memory, log warning, etc
>
> Cheers
>
> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>> have the original checkpointing directory :(  Thanks for the clarification
>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>
>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
>> wrote:
>>
>>> That looks like it's during recovery from a checkpoint, so it'd be
>>> driver memory not executor memory.
>>>
>>> How big is the checkpoint directory that you're trying to restore from?
>>>
>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> We're getting the below error.  Tried increasing spark.executor.memory
>>>> e.g. from 1g to 2g but the below error still happens.
>>>>
>>>> Any recommendations? Something to do with specifying -Xmx in the submit
>>>> job scripts?
>>>>
>>>> Thanks.
>>>>
>>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>>>> limit exceeded
>>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>>> at
>>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>>> at
>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>>> at
>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>>> at
>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>>> at
>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>>>> at
>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>>>> at
>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>> at
>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>> at scala.Option.getOrElse(Option.scala:120)
>>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>>>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>>>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:153)
>>>> at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>> at
>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData.restore(DirectKafkaInputDStream.scala:153)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:402)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream$$anonfun$restoreCheckpointData$2.apply(DStream.scala:403)
>>>> at scala.collection.immutable.List.foreach(List.scala:318)
>>>> at
>>>> org.apache.spark.streaming.dstream.DStream.restoreCheckpointData(DStream.scala:403)
>>>> at
>>>> org.apache.spark.streaming.DStreamGraph$$anonfun$restoreCheckpointData$2.apply(DStreamGraph.scala:149)
>>>>
>>>>
>>>>
>>>>
>>>
>>
>


Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
"You need to keep a certain number of rdds around for checkpointing" --
that seems like a hefty expense to pay in order to achieve fault
tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
sufficient to just persist the offsets, to know where to resume from?

Thanks.

On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger  wrote:

> You need to keep a certain number of rdds around for checkpointing, based
> on e.g. the window size.  Those would all need to be loaded at once.
>
> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Would there be a way to chunk up/batch up the contents of the
>> checkpointing directories as they're being processed by Spark Streaming?
>> Is it mandatory to load the whole thing in one go?
>>
>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>>
>>> I wonder during recovery from a checkpoint whether we can estimate the
>>> size of the checkpoint and compare with Runtime.getRuntime().freeMemory
>>> ().
>>>
>>> If the size of checkpoint is much bigger than free memory, log warning,
>>> etc
>>>
>>> Cheers
>>>
>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>>>> have the original checkpointing directory :(  Thanks for the clarification
>>>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>>>
>>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
>>>> wrote:
>>>>
>>>>> That looks like it's during recovery from a checkpoint, so it'd be
>>>>> driver memory not executor memory.
>>>>>
>>>>> How big is the checkpoint directory that you're trying to restore from?
>>>>>
>>>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> We're getting the below error.  Tried increasing
>>>>>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>>>>>> happens.
>>>>>>
>>>>>> Any recommendations? Something to do with specifying -Xmx in the
>>>>>> submit job scripts?
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>>>>>> limit exceeded
>>>>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>>>>> at
>>>>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>>>>> at
>>>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>>>>> at
>>>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>>>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>>>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>>>>> at
>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>>>>> at
>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1190)
>>>>>> at
>>>>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>>>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>>>> at org.apache.spark.util.Utils$.getCallSite(Utils.scala:1190)
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>>>> at
>>>>>> org.apache.spark.SparkContext$$anonfun$getCallSite$2.apply(SparkContext.scala:1441)
>>>>>> at scala.Option.getOrElse(Option.scala:120)
>>>>>> at org.apache.spark.SparkContext.getCallSite(SparkContext.scala:1441)
>>>>>> at org.apache.spark.rdd.RDD.(RDD.scala:1365)
>>>>>> at org.apache.spark.streaming.kafka.KafkaRDD.(KafkaRDD.scala:46)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$DirectKafkaInputDStreamCheckpointData$$anonfun$restore$2.apply(DirectKafkaInputDStream.scala:155)
>>>>>> at
>>>>>> org.apache.spark.streaming.kafka.DirectKafkaInputDStream$Direct

Re: How to fix OutOfMemoryError: GC overhead limit exceeded when using Spark Streaming checkpointing

2015-08-10 Thread Dmitry Goldenberg
Well, RDD"s also contain data, don't they?

The question is, what can be so hefty in the checkpointing directory to
cause Spark driver to run out of memory?  It seems that it makes
checkpointing expensive, in terms of I/O and memory consumption.  Two
network hops -- to driver, then to workers.  Hefty file system usage, hefty
memory consumption...   What can we do to offset some of these costs?



On Mon, Aug 10, 2015 at 4:27 PM, Cody Koeninger  wrote:

> The rdd is indeed defined by mostly just the offsets / topic partitions.
>
> On Mon, Aug 10, 2015 at 3:24 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> "You need to keep a certain number of rdds around for checkpointing" --
>> that seems like a hefty expense to pay in order to achieve fault
>> tolerance.  Why does Spark persist whole RDD's of data?  Shouldn't it be
>> sufficient to just persist the offsets, to know where to resume from?
>>
>> Thanks.
>>
>>
>> On Mon, Aug 10, 2015 at 1:07 PM, Cody Koeninger 
>> wrote:
>>
>>> You need to keep a certain number of rdds around for checkpointing,
>>> based on e.g. the window size.  Those would all need to be loaded at once.
>>>
>>> On Mon, Aug 10, 2015 at 11:49 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Would there be a way to chunk up/batch up the contents of the
>>>> checkpointing directories as they're being processed by Spark Streaming?
>>>> Is it mandatory to load the whole thing in one go?
>>>>
>>>> On Mon, Aug 10, 2015 at 12:42 PM, Ted Yu  wrote:
>>>>
>>>>> I wonder during recovery from a checkpoint whether we can estimate
>>>>> the size of the checkpoint and compare with Runtime.getRuntime().
>>>>> freeMemory().
>>>>>
>>>>> If the size of checkpoint is much bigger than free memory, log
>>>>> warning, etc
>>>>>
>>>>> Cheers
>>>>>
>>>>> On Mon, Aug 10, 2015 at 9:34 AM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> Thanks, Cody, will try that. Unfortunately due to a reinstall I don't
>>>>>> have the original checkpointing directory :(  Thanks for the 
>>>>>> clarification
>>>>>> on spark.driver.memory, I'll keep testing (at 2g things seem OK for now).
>>>>>>
>>>>>> On Mon, Aug 10, 2015 at 12:10 PM, Cody Koeninger 
>>>>>> wrote:
>>>>>>
>>>>>>> That looks like it's during recovery from a checkpoint, so it'd be
>>>>>>> driver memory not executor memory.
>>>>>>>
>>>>>>> How big is the checkpoint directory that you're trying to restore
>>>>>>> from?
>>>>>>>
>>>>>>> On Mon, Aug 10, 2015 at 10:57 AM, Dmitry Goldenberg <
>>>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>>>
>>>>>>>> We're getting the below error.  Tried increasing
>>>>>>>> spark.executor.memory e.g. from 1g to 2g but the below error still 
>>>>>>>> happens.
>>>>>>>>
>>>>>>>> Any recommendations? Something to do with specifying -Xmx in the
>>>>>>>> submit job scripts?
>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>> Exception in thread "main" java.lang.OutOfMemoryError: GC overhead
>>>>>>>> limit exceeded
>>>>>>>> at java.util.Arrays.copyOf(Arrays.java:3332)
>>>>>>>> at
>>>>>>>> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:137)
>>>>>>>> at
>>>>>>>> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:121)
>>>>>>>> at
>>>>>>>> java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:421)
>>>>>>>> at java.lang.StringBuilder.append(StringBuilder.java:136)
>>>>>>>> at java.lang.StackTraceElement.toString(StackTraceElement.java:173)
>>>>>>>> at
>>>>>>>> org.apache.spark.util.Utils$$anonfun$getCallSite$1.apply(Utils.scala:1212)
>>>>>>>> at
>>>&g

Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Dmitry Goldenberg
Our additional question on checkpointing is basically the logistics of it --

At which point does the data get written into checkpointing?  Is it written
as soon as the driver program retrieves an RDD from Kafka (or another
source)?  Or, is it written after that RDD has been processed and we're
basically moving on to the next RDD?

What I'm driving at is, what happens if the driver program is killed?  The
next time it's started, will it know, from Spark Streaming's checkpointing,
to resume from the same RDD that was being processed at the time of the
program getting killed?  In other words, will we, upon restarting the
consumer, resume from the RDD that was unfinished, or will we be looking at
the next RDD?

Will we pick up from the last known *successfully processed* topic offset?

Thanks.




On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:

> If you've set the checkpoint dir, it seems like indeed the intent is
> to use a default checkpoint interval in DStream:
>
> private[streaming] def initialize(time: Time) {
> ...
>   // Set the checkpoint interval to be slideDuration or 10 seconds,
> which ever is larger
>   if (mustCheckpoint && checkpointDuration == null) {
> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
> slideDuration).toInt
> logInfo("Checkpoint interval automatically set to " +
> checkpointDuration)
>   }
>
> Do you see that log message? what's the interval? that could at least
> explain why it's not doing anything, if it's quite long.
>
> It sort of seems wrong though since
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> suggests it was intended to be a multiple of the batch interval. The
> slide duration wouldn't always be relevant anyway.
>
> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>  wrote:
> > I've instrumented checkpointing per the programming guide and I can tell
> > that Spark Streaming is creating the checkpoint directories but I'm not
> > seeing any content being created in those directories nor am I seeing the
> > effects I'd expect from checkpointing.  I'd expect any data that comes
> into
> > Kafka while the consumers are down, to get picked up when the consumers
> are
> > restarted; I'm not seeing that.
> >
> > For now my checkpoint directory is set to the local file system with the
> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
> > subdirectory named with a UUID being created under there but no files.
> >
> > I'm using a custom JavaStreamingContextFactory which creates a
> > JavaStreamingContext with the directory set into it via the
> > checkpoint(String) method.
> >
> > I'm currently not invoking the checkpoint(Duration) method on the DStream
> > since I want to first rely on Spark's default checkpointing interval.  My
> > streaming batch duration millis is set to 1 second.
> >
> > Anyone have any idea what might be going wrong?
> >
> > Also, at which point does Spark delete files from checkpointing?
> >
> > Thanks.
>


Re: Checkpointing doesn't appear to be working for direct streaming from Kafka

2015-08-14 Thread Dmitry Goldenberg
Thanks, Cody. It sounds like Spark Streaming has enough state info to know
how many batches have been processed and if not all of them then the RDD is
'unfinished'. I wonder if it would know whether the last micro-batch has
been fully processed successfully. Hypothetically, the driver program could
terminate as the last batch is being processed...

On Fri, Aug 14, 2015 at 6:17 PM, Cody Koeninger  wrote:

> You'll resume and re-process the rdd that didnt finish
>
> On Fri, Aug 14, 2015 at 1:31 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Our additional question on checkpointing is basically the logistics of it
>> --
>>
>> At which point does the data get written into checkpointing?  Is it
>> written as soon as the driver program retrieves an RDD from Kafka (or
>> another source)?  Or, is it written after that RDD has been processed and
>> we're basically moving on to the next RDD?
>>
>> What I'm driving at is, what happens if the driver program is killed?
>> The next time it's started, will it know, from Spark Streaming's
>> checkpointing, to resume from the same RDD that was being processed at the
>> time of the program getting killed?  In other words, will we, upon
>> restarting the consumer, resume from the RDD that was unfinished, or will
>> we be looking at the next RDD?
>>
>> Will we pick up from the last known *successfully processed* topic
>> offset?
>>
>> Thanks.
>>
>>
>>
>>
>> On Fri, Jul 31, 2015 at 1:52 PM, Sean Owen  wrote:
>>
>>> If you've set the checkpoint dir, it seems like indeed the intent is
>>> to use a default checkpoint interval in DStream:
>>>
>>> private[streaming] def initialize(time: Time) {
>>> ...
>>>   // Set the checkpoint interval to be slideDuration or 10 seconds,
>>> which ever is larger
>>>   if (mustCheckpoint && checkpointDuration == null) {
>>> checkpointDuration = slideDuration * math.ceil(Seconds(10) /
>>> slideDuration).toInt
>>> logInfo("Checkpoint interval automatically set to " +
>>> checkpointDuration)
>>>   }
>>>
>>> Do you see that log message? what's the interval? that could at least
>>> explain why it's not doing anything, if it's quite long.
>>>
>>> It sort of seems wrong though since
>>> https://spark.apache.org/docs/latest/streaming-programming-guide.html
>>> suggests it was intended to be a multiple of the batch interval. The
>>> slide duration wouldn't always be relevant anyway.
>>>
>>> On Fri, Jul 31, 2015 at 6:16 PM, Dmitry Goldenberg
>>>  wrote:
>>> > I've instrumented checkpointing per the programming guide and I can
>>> tell
>>> > that Spark Streaming is creating the checkpoint directories but I'm not
>>> > seeing any content being created in those directories nor am I seeing
>>> the
>>> > effects I'd expect from checkpointing.  I'd expect any data that comes
>>> into
>>> > Kafka while the consumers are down, to get picked up when the
>>> consumers are
>>> > restarted; I'm not seeing that.
>>> >
>>> > For now my checkpoint directory is set to the local file system with
>>> the
>>> > directory URI being in this form:   file:///mnt/dir1/dir2.  I see a
>>> > subdirectory named with a UUID being created under there but no files.
>>> >
>>> > I'm using a custom JavaStreamingContextFactory which creates a
>>> > JavaStreamingContext with the directory set into it via the
>>> > checkpoint(String) method.
>>> >
>>> > I'm currently not invoking the checkpoint(Duration) method on the
>>> DStream
>>> > since I want to first rely on Spark's default checkpointing interval.
>>> My
>>> > streaming batch duration millis is set to 1 second.
>>> >
>>> > Anyone have any idea what might be going wrong?
>>> >
>>> > Also, at which point does Spark delete files from checkpointing?
>>> >
>>> > Thanks.
>>>
>>
>>
>


Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-03 Thread Dmitry Goldenberg
I'm seeing an oddity where I initially set the batchdurationmillis to 1
second and it works fine:

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(batchDurationMillis));

Then I tried changing the value to 10 seconds. The change didn't seem to
take. I've bounced the Spark workers and the consumers and now I'm seeing
RDD's coming in once around 10 seconds (not always 10 seconds according to
the logs).

However, now I'm trying to change the value to 20 seconds and it's just not
taking. I've bounced Spark master, workers, and consumers and the value
seems "stuck" at 10 seconds.

Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.

Thanks.

- Dmitry


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
Tathagata,

Checkpointing is turned on but we were not recovering. I'm looking at the
logs now, feeding fresh content hours after the restart. Here's a snippet:

2015-09-04 06:11:20,013 ... Documents processed: 0.
2015-09-04 06:11:30,014 ... Documents processed: 0.
2015-09-04 06:11:40,011 ... Documents processed: 0.
2015-09-04 06:11:50,012 ... Documents processed: 0.
2015-09-04 06:12:00,010 ... Documents processed: 0.
2015-09-04 06:12:10,047 ... Documents processed: 0.
2015-09-04 06:12:20,012 ... Documents processed: 0.
2015-09-04 06:12:30,011 ... Documents processed: 0.
2015-09-04 06:12:40,012 ... Documents processed: 0.
*2015-09-04 06:12:55,629 ... Documents processed: 4.*
2015-09-04 06:13:00,018 ... Documents processed: 0.
2015-09-04 06:13:10,012 ... Documents processed: 0.
2015-09-04 06:13:20,019 ... Documents processed: 0.
2015-09-04 06:13:30,014 ... Documents processed: 0.
2015-09-04 06:13:40,041 ... Documents processed: 0.
2015-09-04 06:13:50,009 ... Documents processed: 0.
...
2015-09-04 06:17:30,019 ... Documents processed: 0.
*2015-09-04 06:17:46,832 ... Documents processed: 40.*

Interestingly, the fresh content (4 documents) is fed about 5.6 seconds
after the previous batch, not 10 seconds. The second fresh batch comes in
6.8 seconds after the previous empty one.

Granted, the log message is printed after iterating over the messages which
may account for some time differences. But generally, looking at the log
messages being printed before we iterate, it's still 10 seconds each time,
not 20 which is what batchdurationmillis is currently set to.

Code:

JavaPairInputDStream messages =
KafkaUtils.createDirectStream();
messages.checkpoint(Durations.milliseconds(checkpointMillis));


  JavaDStream messageBodies = messages.map(new Function, String>() {
  @Override
  public String call(Tuple2 tuple2) {
return tuple2._2();
  }
});

messageBodies.foreachRDD(new Function, Void>() {
  @Override
  public Void call(JavaRDD rdd) throws Exception {
ProcessPartitionFunction func = new ProcessPartitionFunction(...);
rdd.foreachPartition(func);
return null;
  }
});

The log message comes from ProcessPartitionFunction:

public void call(Iterator messageIterator) throws Exception {
log.info("Starting data partition processing. AppName={}, topic={}.)...",
appName, topic);
// ... iterate ...
log.info("Finished data partition processing (appName={}, topic={}).
Documents processed: {}.", appName, topic, docCount);
}

Any ideas? Thanks.

- Dmitry

On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das  wrote:

> Are you accidentally recovering from checkpoint files which has 10 second
> as the batch interval?
>
>
> On Thu, Sep 3, 2015 at 7:34 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I'm seeing an oddity where I initially set the batchdurationmillis to 1
>> second and it works fine:
>>
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(batchDurationMillis));
>>
>> Then I tried changing the value to 10 seconds. The change didn't seem to
>> take. I've bounced the Spark workers and the consumers and now I'm seeing
>> RDD's coming in once around 10 seconds (not always 10 seconds according to
>> the logs).
>>
>> However, now I'm trying to change the value to 20 seconds and it's just
>> not taking. I've bounced Spark master, workers, and consumers and the value
>> seems "stuck" at 10 seconds.
>>
>> Any ideas? We're running Spark 1.3.0 built for Hadoop 2.4.
>>
>> Thanks.
>>
>> - Dmitry
>>
>>
>


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
Tathagata,

In our logs I see the batch duration millis being set first to 10 then to
20 seconds. I don't see the 20 being reflected later during ingestion.

In the Spark UI under Streaming I see the below output, notice the *10
second* Batch interval.  Can you think of a reason why it's stuck at 10?
It used to be 1 second by the way, then somehow over the course of a few
restarts we managed to get it to be 10 seconds.  Now it won't get reset to
20 seconds.  Any ideas?

Streaming

   - *Started at: *Thu Sep 03 10:59:03 EDT 2015
   - *Time since start: *1 day 8 hours 44 minutes
   - *Network receivers: *0
   - *Batch interval: *10 seconds
   - *Processed batches: *11790
   - *Waiting batches: *0
   - *Received records: *0
   - *Processed records: *0



Statistics over last 100 processed batchesReceiver Statistics
No receivers
Batch Processing Statistics

   MetricLast batchMinimum25th percentileMedian75th percentileMaximumProcessing
   Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1 ms2
   msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms





On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das  wrote:

> Could you see what the streaming tab in the Spark UI says? It should show
> the underlying batch duration of the StreamingContext, the details of when
> the batch starts, etc.
>
> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when data
> is present (that is, * Documents processed: > 0)*
>
> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Tathagata,
>>
>> Checkpointing is turned on but we were not recovering. I'm looking at the
>> logs now, feeding fresh content hours after the restart. Here's a snippet:
>>
>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>> ...
>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>
>> Interestingly, the fresh content (4 documents) is fed about 5.6 seconds
>> after the previous batch, not 10 seconds. The second fresh batch comes in
>> 6.8 seconds after the previous empty one.
>>
>> Granted, the log message is printed after iterating over the messages
>> which may account for some time differences. But generally, looking at the
>> log messages being printed before we iterate, it's still 10 seconds each
>> time, not 20 which is what batchdurationmillis is currently set to.
>>
>> Code:
>>
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream();
>> messages.checkpoint(Durations.milliseconds(checkpointMillis));
>>
>>
>>   JavaDStream messageBodies = messages.map(new 
>> Function> String>, String>() {
>>   @Override
>>   public String call(Tuple2 tuple2) {
>> return tuple2._2();
>>   }
>> });
>>
>> messageBodies.foreachRDD(new Function, Void>() {
>>   @Override
>>   public Void call(JavaRDD rdd) throws Exception {
>> ProcessPartitionFunction func = new ProcessPartitionFunction(...);
>> rdd.foreachPartition(func);
>> return null;
>>   }
>> });
>>
>> The log message comes from ProcessPartitionFunction:
>>
>> public void call(Iterator messageIterator) throws Exception {
>> log.info("Starting data partition processing. AppName={},
>> topic={}.)...", appName, topic);
>> // ... iterate ...
>> log.info("Finished data partition processing (appName={}, topic={}).
>> Documents processed: {}.", appName, topic, docCount);
>> }
>>
>> Any ideas? Thanks.
>>
>> - Dmitry
>>
>> On Thu, Sep 3, 2015 at 10:45 PM, Tathagata Das 
>> wrote:
>>
>>> Are you accidentally recovering from checkpoint files which h

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
I'd think that we wouldn't be "accidentally recovering from checkpoint"
hours or even days after consumers have been restarted, plus the content is
the fresh content that I'm feeding, not some content that had been fed
before the last restart.

The code is basically as follows:

SparkConf sparkConf = createSparkConf(...);
// We'd be 'checkpointed' because we specify a checkpoint directory
which makes isCheckpointed true
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);jssc.start();

jssc.awaitTermination();

jssc.close();



On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das  wrote:

> Are you sure you are not accidentally recovering from checkpoint? How are
> you using StreamingContext.getOrCreate() in your code?
>
> TD
>
> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Tathagata,
>>
>> In our logs I see the batch duration millis being set first to 10 then to
>> 20 seconds. I don't see the 20 being reflected later during ingestion.
>>
>> In the Spark UI under Streaming I see the below output, notice the *10
>> second* Batch interval.  Can you think of a reason why it's stuck at
>> 10?  It used to be 1 second by the way, then somehow over the course of a
>> few restarts we managed to get it to be 10 seconds.  Now it won't get reset
>> to 20 seconds.  Any ideas?
>>
>> Streaming
>>
>>- *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>- *Time since start: *1 day 8 hours 44 minutes
>>- *Network receivers: *0
>>- *Batch interval: *10 seconds
>>- *Processed batches: *11790
>>- *Waiting batches: *0
>>- *Received records: *0
>>- *Processed records: *0
>>
>>
>>
>> Statistics over last 100 processed batchesReceiver Statistics
>> No receivers
>> Batch Processing Statistics
>>
>>MetricLast batchMinimum25th percentileMedian75th 
>> percentileMaximumProcessing
>>Time23 ms7 ms10 ms11 ms14 ms172 msScheduling Delay1 ms0 ms0 ms0 ms1 ms2
>>msTotal Delay24 ms8 ms10 ms12 ms14 ms173 ms
>>
>>
>>
>>
>>
>> On Fri, Sep 4, 2015 at 3:50 PM, Tathagata Das 
>> wrote:
>>
>>> Could you see what the streaming tab in the Spark UI says? It should
>>> show the underlying batch duration of the StreamingContext, the details of
>>> when the batch starts, etc.
>>>
>>> BTW, it seems that the 5.6 or 6.8 seconds delay is present only when
>>> data is present (that is, * Documents processed: > 0)*
>>>
>>> On Fri, Sep 4, 2015 at 3:38 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Tathagata,
>>>>
>>>> Checkpointing is turned on but we were not recovering. I'm looking at
>>>> the logs now, feeding fresh content hours after the restart. Here's a
>>>> snippet:
>>>>
>>>> 2015-09-04 06:11:20,013 ... Documents processed: 0.
>>>> 2015-09-04 06:11:30,014 ... Documents processed: 0.
>>>> 2015-09-04 06:11:40,011 ... Documents processed: 0.
>>>> 2015-09-04 06:11:50,012 ... Documents processed: 0.
>>>> 2015-09-04 06:12:00,010 ... Documents processed: 0.
>>>> 2015-09-04 06:12:10,047 ... Documents processed: 0.
>>>> 2015-09-04 06:12:20,012 ... Documents processed: 0.
>>>> 2015-09-04 06:12:30,011 ... Documents processed: 0.
>>>> 2015-09-04 06:12:40,012 ... Documents processed: 0.
>>>> *2015-09-04 06:12:55,629 ... Documents processed: 4.*
>>>> 2015-09-04 06:13:00,018 ... Documents processed: 0.
>>>> 2015-09-04 06:13:10,012 ... Documents processed: 0.
>>>> 2015-09-04 06:13:20,019 ... Documents processed: 0.
>>>> 2015-09-04 06:13:30,014 ... Documents processed: 0.
>>>> 2015-09-04 06:13:40,041 ... Documents processed: 0.
>>>> 2015-09-04 06:13:50,009 ... Documents processed: 0.
>>>> ...
>>>> 2015-09-04 06:17:30,019 ... Documents processed: 0.
>>>> *2015-09-04 06:17:46,832 ... Documents processed: 40.*
>>>>
>>>> Interestingly, the fresh content (4 documents) is fed about 5.6 seconds
>>>> after the previous batch, not 10 seconds. The second fresh batch comes in
>>>> 6.8 seconds after the previous empty one.
>>>>
>>>> Granted, the log message is printed after iterating over the messages
>>>> which may account for some time differences. But generally, looki

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-04 Thread Dmitry Goldenberg
Sorry, more relevant code below:

SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);
jssc.start();
jssc.awaitTermination();
jssc.close();
………..
  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{
  @Override
  public JavaStreamingContext create() {
return createContext(sparkConf, params);
  }
};
return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);
  }

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {
// Create context with the specified batch interval, in milliseconds.
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
// Set the checkpoint directory, if we're checkpointing
if (params.isCheckpointed()) {
  jssc.checkpoint(params.getCheckpointDir());
}

Set topicsSet = new HashSet(Arrays.asList(params
.getTopic()));

// Set the Kafka parameters.
Map kafkaParams = new HashMap();
kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
.getBrokerList());
if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
  kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
.getAutoOffsetReset());
}

// Create direct Kafka stream with the brokers and the topic.
JavaPairInputDStream messages =
KafkaUtils.createDirectStream(
  jssc,
  String.class,
  String.class,
  StringDecoder.class,
  StringDecoder.class,
  kafkaParams,
  topicsSet);

// See if there's an override of the default checkpoint duration.
if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
  messages.checkpoint(Durations.milliseconds(params
.getCheckpointMillis()));
}

JavaDStream messageBodies = messages.map(new
Function, String>() {
  @Override
  public String call(Tuple2 tuple2) {
return tuple2._2();
  }
});

messageBodies.foreachRDD(new Function, Void>() {
  @Override
  public Void call(JavaRDD rdd) throws Exception {
ProcessPartitionFunction func = new
ProcessPartitionFunction(params);
rdd.foreachPartition(func);
return null;
  }
});

return jssc;
}

On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg 
wrote:

> I'd think that we wouldn't be "accidentally recovering from checkpoint"
> hours or even days after consumers have been restarted, plus the content is
> the fresh content that I'm feeding, not some content that had been fed
> before the last restart.
>
> The code is basically as follows:
>
> SparkConf sparkConf = createSparkConf(...);
> // We'd be 'checkpointed' because we specify a checkpoint directory
> which makes isCheckpointed true
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);jssc.start();
>
> jssc.awaitTermination();
>
> jssc.close();
>
>
>
> On Fri, Sep 4, 2015 at 8:48 PM, Tathagata Das  wrote:
>
>> Are you sure you are not accidentally recovering from checkpoint? How are
>> you using StreamingContext.getOrCreate() in your code?
>>
>> TD
>>
>> On Fri, Sep 4, 2015 at 4:53 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Tathagata,
>>>
>>> In our logs I see the batch duration millis being set first to 10 then
>>> to 20 seconds. I don't see the 20 being reflected later during ingestion.
>>>
>>> In the Spark UI under Streaming I see the below output, notice the *10
>>> second* Batch interval.  Can you think of a reason why it's stuck at
>>> 10?  It used to be 1 second by the way, then somehow over the course of a
>>> few restarts we managed to get it to be 10 seconds.  Now it won't get reset
>>> to 20 seconds.  Any ideas?
>>>
>>> Streaming
>>>
>>>- *Started at: *Thu Sep 03 10:59:03 EDT 2015
>>>- *Time since start: *1 day 8 hours 44 minutes
>>>- *Network receivers: *0
>>>- *Batch interval: *10 seconds
>>>- *Processed batches: *11790
>>>- *Waiting batches: *0
>>>- *Received records: *0
>>>- *Processed records: *0
>>>
>>>
>>>
>>> Statistics over last 100 processed batchesReceiver Statistics
>>> No receivers
>>> Batch Processing Statistics
>>>
>>>MetricLast batchMini

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
I've stopped the jobs, the workers, and the master. Deleted the contents of
the checkpointing dir. Then restarted master, workers, and consumers.

I'm seeing the job in question still firing every 10 seconds.  I'm seeing
the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
strange given that the jobs used to fire every 1 second, we've switched to
10, now trying to switch to 20 and batch duration millis is not changing.

Does anything stand out in the code perhaps?

On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger  wrote:

> Have you tried deleting or moving the contents of the checkpoint directory
> and restarting the job?
>
> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Sorry, more relevant code below:
>>
>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>> JavaStreamingContext jssc = params.isCheckpointed() ?
>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>> params);
>> jssc.start();
>> jssc.awaitTermination();
>> jssc.close();
>> ………..
>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>> sparkConf, Parameters params) {
>> JavaStreamingContextFactory factory = new
>> JavaStreamingContextFactory() {
>>   @Override
>>   public JavaStreamingContext create() {
>> return createContext(sparkConf, params);
>>   }
>> };
>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>> factory);
>>   }
>>
>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>> Parameters params) {
>> // Create context with the specified batch interval, in milliseconds.
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(params.getBatchDurationMillis()));
>> // Set the checkpoint directory, if we're checkpointing
>> if (params.isCheckpointed()) {
>>   jssc.checkpoint(params.getCheckpointDir());
>> }
>>
>> Set topicsSet = new HashSet(Arrays.asList(params
>> .getTopic()));
>>
>> // Set the Kafka parameters.
>> Map kafkaParams = new HashMap();
>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST, params
>> .getBrokerList());
>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
>> .getAutoOffsetReset());
>> }
>>
>> // Create direct Kafka stream with the brokers and the topic.
>> JavaPairInputDStream messages =
>> KafkaUtils.createDirectStream(
>>   jssc,
>>   String.class,
>>   String.class,
>>   StringDecoder.class,
>>   StringDecoder.class,
>>   kafkaParams,
>>   topicsSet);
>>
>> // See if there's an override of the default checkpoint duration.
>> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>>   messages.checkpoint(Durations.milliseconds(params
>> .getCheckpointMillis()));
>> }
>>
>> JavaDStream messageBodies = messages.map(new
>> Function, String>() {
>>   @Override
>>   public String call(Tuple2 tuple2) {
>> return tuple2._2();
>>   }
>> });
>>
>> messageBodies.foreachRDD(new Function, Void>() {
>>   @Override
>>   public Void call(JavaRDD rdd) throws Exception {
>> ProcessPartitionFunction func = new
>> ProcessPartitionFunction(params);
>> rdd.foreachPartition(func);
>> return null;
>>   }
>> });
>>
>> return jssc;
>> }
>>
>> On Fri, Sep 4, 2015 at 8:57 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> I'd think that we wouldn't be "accidentally recovering from checkpoint"
>>> hours or even days after consumers have been restarted, plus the content is
>>> the fresh content that I'm feeding, not some content that had been fed
>>> before the last restart.
>>>
>>> The code is basically as follows:
>>>
>>> SparkConf sparkConf = createSparkConf(...);
>>> // We'd be 'checkpointed' because we specify a checkpoint directory
>>> which makes isCheckpointed true
>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>> params);jssc.start();
>>>
>>> 

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
Just verified the logic for passing the batch duration millis in, looks OK.
I see the value of 20 seconds being reflected in the logs - but not in the
spark ui.

Also, just commented out this piece and the consumer is still stuck at
using 10 seconds for batch duration millis.

//if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
//
messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
//}

The reason this is in the code is so that we can control the checkpointing
millis.  Doing this through the checkpoint() method seems the only way to
override the default value which is max(batchdurationmillis, 10seconds).
Is there a better way of doing this?

Thanks.


On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger  wrote:

> Well, I'm not sure why you're checkpointing messages.
>
> I'd also put in some logging to see what values are actually being read
> out of your params object for the various settings.
>
>
> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I've stopped the jobs, the workers, and the master. Deleted the contents
>> of the checkpointing dir. Then restarted master, workers, and consumers.
>>
>> I'm seeing the job in question still firing every 10 seconds.  I'm seeing
>> the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
>> strange given that the jobs used to fire every 1 second, we've switched to
>> 10, now trying to switch to 20 and batch duration millis is not changing.
>>
>> Does anything stand out in the code perhaps?
>>
>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger 
>> wrote:
>>
>>> Have you tried deleting or moving the contents of the checkpoint
>>> directory and restarting the job?
>>>
>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Sorry, more relevant code below:
>>>>
>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>>> params);
>>>> jssc.start();
>>>> jssc.awaitTermination();
>>>> jssc.close();
>>>> ………..
>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>> sparkConf, Parameters params) {
>>>> JavaStreamingContextFactory factory = new
>>>> JavaStreamingContextFactory() {
>>>>   @Override
>>>>   public JavaStreamingContext create() {
>>>> return createContext(sparkConf, params);
>>>>   }
>>>> };
>>>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>> factory);
>>>>   }
>>>>
>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>> Parameters params) {
>>>> // Create context with the specified batch interval, in
>>>> milliseconds.
>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>> // Set the checkpoint directory, if we're checkpointing
>>>> if (params.isCheckpointed()) {
>>>>   jssc.checkpoint(params.getCheckpointDir());
>>>> }
>>>>
>>>> Set topicsSet = new HashSet(Arrays.asList(params
>>>> .getTopic()));
>>>>
>>>> // Set the Kafka parameters.
>>>> Map kafkaParams = new HashMap();
>>>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
>>>> params.getBrokerList());
>>>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>>>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
>>>> .getAutoOffsetReset());
>>>> }
>>>>
>>>> // Create direct Kafka stream with the brokers and the topic.
>>>> JavaPairInputDStream messages =
>>>> KafkaUtils.createDirectStream(
>>>>   jssc,
>>>>   String.class,
>>>>   String.class,
>>>>   StringDecoder.class,
>>>>   StringDecoder.class,
>>>>   kafkaParams,
>>>>   topicsSet);
>>>>
>>>> // See if there's an override of the default checkpoint duration.
>>>> if (params.isCheckpointed() && params.getCheckpoi

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
I just disabled checkpointing in our consumers and I can see that the batch
duration millis set to 20 seconds is now being honored.

Why would that be the case?

And how can we "untie" batch duration millis from checkpointing?

Thanks.

On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger  wrote:

> Well, I'm not sure why you're checkpointing messages.
>
> I'd also put in some logging to see what values are actually being read
> out of your params object for the various settings.
>
>
> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I've stopped the jobs, the workers, and the master. Deleted the contents
>> of the checkpointing dir. Then restarted master, workers, and consumers.
>>
>> I'm seeing the job in question still firing every 10 seconds.  I'm seeing
>> the 10 seconds in the Spark Jobs GUI page as well as our logs.  Seems quite
>> strange given that the jobs used to fire every 1 second, we've switched to
>> 10, now trying to switch to 20 and batch duration millis is not changing.
>>
>> Does anything stand out in the code perhaps?
>>
>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger 
>> wrote:
>>
>>> Have you tried deleting or moving the contents of the checkpoint
>>> directory and restarting the job?
>>>
>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> Sorry, more relevant code below:
>>>>
>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>>> params);
>>>> jssc.start();
>>>> jssc.awaitTermination();
>>>> jssc.close();
>>>> ………..
>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>> sparkConf, Parameters params) {
>>>> JavaStreamingContextFactory factory = new
>>>> JavaStreamingContextFactory() {
>>>>   @Override
>>>>   public JavaStreamingContext create() {
>>>> return createContext(sparkConf, params);
>>>>   }
>>>> };
>>>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>> factory);
>>>>   }
>>>>
>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>> Parameters params) {
>>>> // Create context with the specified batch interval, in
>>>> milliseconds.
>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>> // Set the checkpoint directory, if we're checkpointing
>>>> if (params.isCheckpointed()) {
>>>>   jssc.checkpoint(params.getCheckpointDir());
>>>> }
>>>>
>>>> Set topicsSet = new HashSet(Arrays.asList(params
>>>> .getTopic()));
>>>>
>>>> // Set the Kafka parameters.
>>>> Map kafkaParams = new HashMap();
>>>> kafkaParams.put(KafkaProducerProperties.METADATA_BROKER_LIST,
>>>> params.getBrokerList());
>>>> if (StringUtils.isNotBlank(params.getAutoOffsetReset())) {
>>>>   kafkaParams.put(KafkaConsumerProperties.AUTO_OFFSET_RESET, params
>>>> .getAutoOffsetReset());
>>>> }
>>>>
>>>> // Create direct Kafka stream with the brokers and the topic.
>>>> JavaPairInputDStream messages =
>>>> KafkaUtils.createDirectStream(
>>>>   jssc,
>>>>   String.class,
>>>>   String.class,
>>>>   StringDecoder.class,
>>>>   StringDecoder.class,
>>>>   kafkaParams,
>>>>   topicsSet);
>>>>
>>>> // See if there's an override of the default checkpoint duration.
>>>> if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>>>>   messages.checkpoint(Durations.milliseconds(params
>>>> .getCheckpointMillis()));
>>>> }
>>>>
>>>> JavaDStream messageBodies = messages.map(new
>>>> Function, String>() {
>>>>   @Override
>>>>   public String call(Tuple2 tuple2) {
>>>> return tuple2._2();
>>>>   }
>>>> });
>

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
>> Why are you checkpointing the direct kafka stream? It serves not purpose.

Could you elaborate on what you mean?

Our goal is fault tolerance.  If a consumer is killed or stopped midstream,
we want to resume where we left off next time the consumer is restarted.

How would that be "not surving a purpose"?  This is already working for
us.  From our testing, we indeed resume where we left off, which is not
possible without checkpointing.  If checkpointing is turned off, we'll
resume with the later Kafka topic entries, which would lead to skipping
over some entries.

Please elaborate.

We need both checkpointing and the ability to set batch duration millis.
The Spark API provides both capabilities but somehow if checkpointing is
turned on, our batch duration millis are always set to 10 seconds
internally by Spark.  What is the resolution?


On Tue, Sep 8, 2015 at 7:23 PM, Tathagata Das  wrote:

> Why are you checkpointing the direct kafka stream? It serves not purpose.
>
> TD
>
> On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> I just disabled checkpointing in our consumers and I can see that the
>> batch duration millis set to 20 seconds is now being honored.
>>
>> Why would that be the case?
>>
>> And how can we "untie" batch duration millis from checkpointing?
>>
>> Thanks.
>>
>> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger 
>> wrote:
>>
>>> Well, I'm not sure why you're checkpointing messages.
>>>
>>> I'd also put in some logging to see what values are actually being read
>>> out of your params object for the various settings.
>>>
>>>
>>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> I've stopped the jobs, the workers, and the master. Deleted the
>>>> contents of the checkpointing dir. Then restarted master, workers, and
>>>> consumers.
>>>>
>>>> I'm seeing the job in question still firing every 10 seconds.  I'm
>>>> seeing the 10 seconds in the Spark Jobs GUI page as well as our logs.
>>>> Seems quite strange given that the jobs used to fire every 1 second, we've
>>>> switched to 10, now trying to switch to 20 and batch duration millis is not
>>>> changing.
>>>>
>>>> Does anything stand out in the code perhaps?
>>>>
>>>> On Tue, Sep 8, 2015 at 9:53 AM, Cody Koeninger 
>>>> wrote:
>>>>
>>>>> Have you tried deleting or moving the contents of the checkpoint
>>>>> directory and restarting the job?
>>>>>
>>>>> On Fri, Sep 4, 2015 at 8:02 PM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> Sorry, more relevant code below:
>>>>>>
>>>>>> SparkConf sparkConf = createSparkConf(appName, kahunaEnv);
>>>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>>>> createCheckpointedContext(sparkConf, params) : createContext(
>>>>>> sparkConf, params);
>>>>>> jssc.start();
>>>>>> jssc.awaitTermination();
>>>>>> jssc.close();
>>>>>> ………..
>>>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>>>> sparkConf, Parameters params) {
>>>>>> JavaStreamingContextFactory factory = new
>>>>>> JavaStreamingContextFactory() {
>>>>>>   @Override
>>>>>>   public JavaStreamingContext create() {
>>>>>> return createContext(sparkConf, params);
>>>>>>   }
>>>>>> };
>>>>>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>>>> factory);
>>>>>>   }
>>>>>>
>>>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>>>> Parameters params) {
>>>>>> // Create context with the specified batch interval, in
>>>>>> milliseconds.
>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>>>> // Set the checkpoint directory, if we're checkpointing
>>>>>> if (params.isCheckpointed()) {
>>>>>>   jssc.checkpoint(params.getCheckpointDi

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
That is good to know. However, that doesn't change the problem I'm seeing.
Which is that, even with that piece of code commented out
(stream.checkpoint()), the batch duration millis aren't getting changed
unless I take checkpointing completely out.

In other words, this commented out:

//if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
//
messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
//}

doesn't change the "stickiness" of the batch duration millis value.
However, if the context is not checkpointed, the problem goes away and the
batch duration millis setting is properly updated.

I'll take the commented out piece out however I still need to figure out
how to fix the batch duration millis so I can also keep the checkpointed
context.

The checkpointed context is basically created as I stated before, note the
invokation of the checkpoint() method:

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{
  @Override
  public JavaStreamingContext create() {
return createContext(sparkConf, params);
  }
};
return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);
  }

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {
// Create context with the specified batch interval, in milliseconds.
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
// Set the checkpoint directory, if we're checkpointing
if (params.isCheckpointed()) {
  jssc.checkpoint(params.getCheckpointDir());
}

...

On Tue, Sep 8, 2015 at 11:14 PM, Tathagata Das  wrote:

> Calling directKafkaStream.checkpoint() will make the system write the raw
> kafka data into HDFS files (that is, RDD checkpointing). This is completely
> unnecessary with Direct Kafka because it already tracks the offset of data
> in each batch
>


> (which checkpoint is enabled using
> streamingContext.checkpoint(checkpointDir)) and can recover from failure by
> reading the exact same data back from Kafka.
>
>
> TD
>
> On Tue, Sep 8, 2015 at 4:38 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> >> Why are you checkpointing the direct kafka stream? It serves not
>> purpose.
>>
>> Could you elaborate on what you mean?
>>
>> Our goal is fault tolerance.  If a consumer is killed or stopped
>> midstream, we want to resume where we left off next time the consumer is
>> restarted.
>>
>> How would that be "not surving a purpose"?  This is already working for
>> us.  From our testing, we indeed resume where we left off, which is not
>> possible without checkpointing.  If checkpointing is turned off, we'll
>> resume with the later Kafka topic entries, which would lead to skipping
>> over some entries.
>>
>> Please elaborate.
>>
>> We need both checkpointing and the ability to set batch duration millis.
>> The Spark API provides both capabilities but somehow if checkpointing is
>> turned on, our batch duration millis are always set to 10 seconds
>> internally by Spark.  What is the resolution?
>>
>>
>> On Tue, Sep 8, 2015 at 7:23 PM, Tathagata Das 
>> wrote:
>>
>>> Why are you checkpointing the direct kafka stream? It serves not purpose.
>>>
>>> TD
>>>
>>> On Tue, Sep 8, 2015 at 9:35 AM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> I just disabled checkpointing in our consumers and I can see that the
>>>> batch duration millis set to 20 seconds is now being honored.
>>>>
>>>> Why would that be the case?
>>>>
>>>> And how can we "untie" batch duration millis from checkpointing?
>>>>
>>>> Thanks.
>>>>
>>>> On Tue, Sep 8, 2015 at 11:48 AM, Cody Koeninger 
>>>> wrote:
>>>>
>>>>> Well, I'm not sure why you're checkpointing messages.
>>>>>
>>>>> I'd also put in some logging to see what values are actually being
>>>>> read out of your params object for the various settings.
>>>>>
>>>>>
>>>>> On Tue, Sep 8, 2015 at 10:24 AM, Dmitry Goldenberg <
>>>>> dgoldenberg...@gmail.com> wrote:
>>>>>
>>>>>> I've stopped the jobs, the workers, and the master. Deleted the
>>>>>> contents of the checkpointing dir. Then restarted master,

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-08 Thread Dmitry Goldenberg
What's wrong with creating a checkpointed context??  We WANT checkpointing,
first of all.  We therefore WANT the checkpointed context.

Second of all, it's not true that we're loading the checkpointed context
independent of whether params.isCheckpointed() is true.  I'm quoting the
code again:

// This is NOT loading a checkpointed context if isCheckpointed() is false.
JavaStreamingContext jssc = params.isCheckpointed() ?
createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
params);

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {
JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{
  @Override
  public JavaStreamingContext create() {
return createContext(sparkConf, params);
  }
};
return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);
  }

  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {
// Create context with the specified batch interval, in milliseconds.
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));
// Set the checkpoint directory, if we're checkpointing
if (params.isCheckpointed()) {
  jssc.checkpoint(params.getCheckpointDir());

}
...
Again, this is *only* calling context.checkpoint() if isCheckpointed() is
true.  And we WANT it to be true.

What am I missing here?



On Tue, Sep 8, 2015 at 11:42 PM, Tathagata Das  wrote:

> Well, you are returning JavaStreamingContext.getOrCreate(params.
> getCheckpointDir(), factory);
> That is loading the checkpointed context, independent of whether params
> .isCheckpointed() is true.
>
>
>
>
> On Tue, Sep 8, 2015 at 8:28 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> That is good to know. However, that doesn't change the problem I'm
>> seeing. Which is that, even with that piece of code commented out
>> (stream.checkpoint()), the batch duration millis aren't getting changed
>> unless I take checkpointing completely out.
>>
>> In other words, this commented out:
>>
>> //if (params.isCheckpointed() && params.getCheckpointMillis() > 0L) {
>> //
>> messages.checkpoint(Durations.milliseconds(params.getCheckpointMillis()));
>> //}
>>
>> doesn't change the "stickiness" of the batch duration millis value.
>> However, if the context is not checkpointed, the problem goes away and the
>> batch duration millis setting is properly updated.
>>
>> I'll take the commented out piece out however I still need to figure out
>> how to fix the batch duration millis so I can also keep the checkpointed
>> context.
>>
>> The checkpointed context is basically created as I stated before, note
>> the invokation of the checkpoint() method:
>>
>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>> sparkConf, Parameters params) {
>> JavaStreamingContextFactory factory = new
>> JavaStreamingContextFactory() {
>>   @Override
>>   public JavaStreamingContext create() {
>> return createContext(sparkConf, params);
>>   }
>> };
>> return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>> factory);
>>   }
>>
>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>> Parameters params) {
>> // Create context with the specified batch interval, in milliseconds.
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(params.getBatchDurationMillis()));
>> // Set the checkpoint directory, if we're checkpointing
>> if (params.isCheckpointed()) {
>>   jssc.checkpoint(params.getCheckpointDir());
>> }
>>
>> ...
>>
>> On Tue, Sep 8, 2015 at 11:14 PM, Tathagata Das 
>> wrote:
>>
>>> Calling directKafkaStream.checkpoint() will make the system write the
>>> raw kafka data into HDFS files (that is, RDD checkpointing). This is
>>> completely unnecessary with Direct Kafka because it already tracks the
>>> offset of data in each batch
>>>
>>
>>
>>> (which checkpoint is enabled using
>>> streamingContext.checkpoint(checkpointDir)) and can recover from failure by
>>> reading the exact same data back from Kafka.
>>>
>>>
>>> TD
>>>
>>> On Tue, Sep 8, 2015 at 4:38 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> >> Why are you checkpointing the direct kafka stream? It se

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Dmitry Goldenberg
>> when you use getOrCreate, and there exists a valid checkpoint, it will
always return the context from the checkpoint and not call the factory.
Simple way to see whats going on is to print something in the factory to
verify whether it is ever called.

This is probably OK. Seems to explain why we were getting a sticky batch
duration millis value. Once I blew away all the checkpointing directories
and unplugged the data checkpointing (while keeping the metadata
checkpointing) the batch duration millis was no longer sticky.

So, there doesn't seem to be a way for metadata checkpointing to override
its checkpoint duration millis, is there?  Is the default there
max(batchdurationmillis, 10seconds)?  Is there a way to override this?
Thanks.





On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das  wrote:

>
>
> See inline.
>
> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> What's wrong with creating a checkpointed context??  We WANT
>> checkpointing, first of all.  We therefore WANT the checkpointed context.
>>
>> Second of all, it's not true that we're loading the checkpointed context
>> independent of whether params.isCheckpointed() is true.  I'm quoting the
>> code again:
>>
>> // This is NOT loading a checkpointed context if isCheckpointed() is
>> false.
>> JavaStreamingContext jssc = params.isCheckpointed() ?
>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>> params);
>>
>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>> sparkConf, Parameters params) {
>> JavaStreamingContextFactory factory = new
>> JavaStreamingContextFactory() {
>>   @Override
>>   public JavaStreamingContext create() {
>> return createContext(sparkConf, params);
>>   }
>> };
>> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>> factory);*
>>
> ^   when you use getOrCreate, and there exists a valid checkpoint, it
> will always return the context from the checkpoint and not call the
> factory. Simple way to see whats going on is to print something in the
> factory to verify whether it is ever called.
>
>
>
>
>
>>   }
>>
>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>> Parameters params) {
>> // Create context with the specified batch interval, in milliseconds.
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(params.getBatchDurationMillis()));
>> // Set the checkpoint directory, if we're checkpointing
>> if (params.isCheckpointed()) {
>>   jssc.checkpoint(params.getCheckpointDir());
>>
>> }
>> ...
>> Again, this is *only* calling context.checkpoint() if isCheckpointed() is
>> true.  And we WANT it to be true.
>>
>> What am I missing here?
>>
>>
>>


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-10 Thread Dmitry Goldenberg
>> The whole point of checkpointing is to recover the *exact* computation
where it left off.

That makes sense. We were looking at the metadata checkpointing and the
data checkpointing, and with data checkpointing, you can specify a
checkpoint duration value. With the metadata checkpointing, there doesn't
seem to be a way, which may be the intent but it wasn't clear why there's a
way to override one duration (for data) but not the other (for metadata).

The basic feel was that we'd want to minimize the number of times Spark
Streaming is doing the checkpointing I/O. In other words, some sort of
sweet spot value where we do checkpointing frequently enough without
performing I/O too frequently. Finding that sweet spot would mean
experimenting with the checkpoint duration millis but that parameter
doesn't appear to be exposed in case of metadata checkpointing.



On Wed, Sep 9, 2015 at 10:39 PM, Tathagata Das  wrote:

> The whole point of checkpointing is to recover the *exact* computation
> where it left of.
> If you want any change in the specification of the computation (which
> includes any intervals), then you cannot recover from checkpoint as it can
> be an arbitrarily complex issue to deal with changes in the specs,
> especially because a lot of specs are tied to each other (e.g. checkpoint
> interval dictates other things like clean up intervals, etc.)
>
> Why do you need to change the checkpointing interval at the time of
> recovery? Trying to understand your usecase.
>
>
> On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> >> when you use getOrCreate, and there exists a valid checkpoint, it will
>> always return the context from the checkpoint and not call the factory.
>> Simple way to see whats going on is to print something in the factory to
>> verify whether it is ever called.
>>
>> This is probably OK. Seems to explain why we were getting a sticky batch
>> duration millis value. Once I blew away all the checkpointing directories
>> and unplugged the data checkpointing (while keeping the metadata
>> checkpointing) the batch duration millis was no longer sticky.
>>
>> So, there doesn't seem to be a way for metadata checkpointing to override
>> its checkpoint duration millis, is there?  Is the default there
>> max(batchdurationmillis, 10seconds)?  Is there a way to override this?
>> Thanks.
>>
>>
>>
>>
>>
>> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das 
>> wrote:
>>
>>>
>>>
>>> See inline.
>>>
>>> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
>>> dgoldenberg...@gmail.com> wrote:
>>>
>>>> What's wrong with creating a checkpointed context??  We WANT
>>>> checkpointing, first of all.  We therefore WANT the checkpointed context.
>>>>
>>>> Second of all, it's not true that we're loading the checkpointed
>>>> context independent of whether params.isCheckpointed() is true.  I'm
>>>> quoting the code again:
>>>>
>>>> // This is NOT loading a checkpointed context if isCheckpointed() is
>>>> false.
>>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>>> params);
>>>>
>>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>>> sparkConf, Parameters params) {
>>>> JavaStreamingContextFactory factory = new
>>>> JavaStreamingContextFactory() {
>>>>   @Override
>>>>   public JavaStreamingContext create() {
>>>> return createContext(sparkConf, params);
>>>>   }
>>>> };
>>>> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>>> factory);*
>>>>
>>> ^   when you use getOrCreate, and there exists a valid checkpoint,
>>> it will always return the context from the checkpoint and not call the
>>> factory. Simple way to see whats going on is to print something in the
>>> factory to verify whether it is ever called.
>>>
>>>
>>>
>>>
>>>
>>>>   }
>>>>
>>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>>> Parameters params) {
>>>> // Create context with the specified batch interval, in
>>>> milliseconds.
>>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>>> // Set the checkpoint directory, if we're checkpointing
>>>> if (params.isCheckpointed()) {
>>>>   jssc.checkpoint(params.getCheckpointDir());
>>>>
>>>> }
>>>> ...
>>>> Again, this is *only* calling context.checkpoint() if isCheckpointed()
>>>> is true.  And we WANT it to be true.
>>>>
>>>> What am I missing here?
>>>>
>>>>
>>>>
>>
>


Re: Using KafkaDirectStream, stopGracefully and exceptions

2015-09-10 Thread Dmitry Goldenberg
>> checkpoints can't be used between controlled restarts

Is that true? If so, why? From my testing, checkpoints appear to be working
fine, we get the data we've missed between the time the consumer went down
and the time we brought it back up.

>> If I cannot make checkpoints between code upgrades, does it mean that
Spark does not help me at all with keeping my Kafka offsets? Does it mean,
that I have to implement my own storing to/initalization of offsets from
Zookeeper?

By code upgrades, are code changes to the consumer program meant?

If that is the case, one idea we've been entertaining is that, if the
consumer changes, especially if its configuration parameters change, it
means that some older configuration may still be stuck in the
checkpointing.  What we'd do in this case is, prior to starting the
consumer, blow away the checkpointing directory and re-consume from Kafka
from the smallest offsets.  In our case, it's OK to re-process; I realize
that in many cases that may not be an option.  If that's the case then it
would seem to follow that you have to manage offsets in Zk...

Another thing to consider would be to treat upgrades operationally. In
that, if an upgrade is to happen, consume the data up to a certain point
then bring the system down for an upgrade. Remove checkpointing. Restart
everything; the system would now be rebuilding the checkpointing and using
your upgraded consumers.  (Again, this may not be possible in some systems
where the data influx is constant and/or the data is mission critical)...

Perhaps this discussion implies that there may be a new feature in Spark
where it intelligently drops the checkpointing or allows you to selectively
pluck out and drop some items prior to restarting...




On Thu, Sep 10, 2015 at 6:22 AM, Akhil Das 
wrote:

> This consumer pretty much covers all those scenarios you listed
> github.com/dibbhatt/kafka-spark-consumer Give it a try.
>
> Thanks
> Best Regards
>
> On Thu, Sep 10, 2015 at 3:32 PM, Krzysztof Zarzycki 
> wrote:
>
>> Hi there,
>> I have a problem with fulfilling all my needs when using Spark Streaming
>> on Kafka. Let me enumerate my requirements:
>> 1. I want to have at-least-once/exactly-once processing.
>> 2. I want to have my application fault & simple stop tolerant. The Kafka
>> offsets need to be tracked between restarts.
>> 3. I want to be able to upgrade code of my application without losing
>> Kafka offsets.
>>
>> Now what my requirements imply according to my knowledge:
>> 1. implies using new Kafka DirectStream.
>> 2. implies  using checkpointing. kafka DirectStream will write offsets to
>> the checkpoint as well.
>> 3. implies that checkpoints can't be used between controlled restarts. So
>> I need to install shutdownHook with ssc.stop(stopGracefully=true) (here is
>> a description how:
>> https://metabroadcast.com/blog/stop-your-spark-streaming-application-gracefully
>> )
>>
>> Now my problems are:
>> 1. If I cannot make checkpoints between code upgrades, does it mean that
>> Spark does not help me at all with keeping my Kafka offsets? Does it mean,
>> that I have to implement my own storing to/initalization of offsets from
>> Zookeeper?
>> 2. When I set up shutdownHook and my any executor throws an exception, it
>> seems that application does not fail, but stuck in running state. Is that
>> because stopGracefully deadlocks on exceptions? How to overcome this
>> problem? Maybe I can avoid setting shutdownHook and there is other way to
>> stop gracefully your app?
>>
>> 3. If I somehow overcome 2., is it enough to just stop gracefully my app
>> to be able to upgrade code & not lose Kafka offsets?
>>
>>
>> Thank you a lot for your answers,
>> Krzysztof Zarzycki
>>
>>
>>
>>
>


A way to kill laggard jobs?

2015-09-11 Thread Dmitry Goldenberg
Is there a way to kill a laggard Spark job manually, and more importantly,
is there a way to do it programmatically based on a configurable timeout
value?

Thanks.


A way to timeout and terminate a laggard 'Stage' ?

2015-09-14 Thread Dmitry Goldenberg
Is there a way in Spark to automatically terminate laggard "stage's", ones
that appear to be hanging?   In other words, is there a timeout for
processing of a given RDD?

In the Spark GUI, I see the "kill" function for a given Stage under
'Details for Job <...>".

Is there something in Spark that would identify and kill laggards
proactively?

Thanks.


Re: A way to timeout and terminate a laggard 'Stage' ?

2015-09-15 Thread Dmitry Goldenberg
Thanks, Mark, will look into that...

On Tue, Sep 15, 2015 at 12:33 PM, Mark Hamstra 
wrote:

> There is the Async API (
> https://github.com/clearstorydata/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala),
> which makes use of FutureAction (
> https://github.com/clearstorydata/spark/blob/master/core/src/main/scala/org/apache/spark/FutureAction.scala).
> You could also wrap up your Jobs in Futures on your own.
>
> On Mon, Sep 14, 2015 at 11:37 PM, Akhil Das 
> wrote:
>
>> As of now i think its a no. Not sure if its a naive approach, but yes you
>> can have a separate program to keep an eye in the webui (possibly parsing
>> the content) and make it trigger the kill task/job once it detects a lag.
>> (Again you will have to figure out the correct numbers before killing any
>> job)
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Sep 14, 2015 at 10:40 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> Is there a way in Spark to automatically terminate laggard "stage's",
>>> ones that appear to be hanging?   In other words, is there a timeout for
>>> processing of a given RDD?
>>>
>>> In the Spark GUI, I see the "kill" function for a given Stage under
>>> 'Details for Job <...>".
>>>
>>> Is there something in Spark that would identify and kill laggards
>>> proactively?
>>>
>>> Thanks.
>>>
>>
>>
>


Reason for Kafka topic existence check / "Does the topic exist?" error

2016-10-08 Thread Dmitry Goldenberg
Hi,

I am trying to start up a simple consumer that streams from a Kafka topic,
using Spark 2.0.0:

   - spark-streaming_2.11
   - spark-streaming-kafka-0-8_2.11

I was getting an error as below until I created the topic in Kafka. From
integrating Spark 1.5, I never used to hit this check; we were able to
start all of our Spark Kafka consumers, then start the producers, and have
Kafka automatically create the topics once the first message for a given
topic was published.

Is there something I might be doing to cause this topic existence check in
KafkaCluster.scala to kick in? I'd much rather be able to not have to
pre-create the topics before I start the consumers.  Any thoughts/comments
would be appreciated.

Thanks.
- Dmitry



Exception in thread "main" org.apache.spark.SparkException:
java.nio.channels.ClosedChannelException

java.nio.channels.ClosedChannelException

org.apache.spark.SparkException: Error getting partition metadata for
''. Does the topic exist?

at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)


at
org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$checkErrors$1.apply(KafkaCluster.scala:373)


at scala.util.Either.fold(Either.scala:98)

at
org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:372)


at
org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)


at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)


at
org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)


at
org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)


at
com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.createContext(KafkaSparkStreamingDriver.java:253)


at
com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.execute(KafkaSparkStreamingDriver.java:166)


at
com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.main(KafkaSparkStreamingDriver.java:305)


at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)


at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)


at java.lang.reflect.Method.invoke(Method.java:498)

at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:729)


at
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)

at
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)

at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


SparkConf.setExecutorEnv works differently in Spark 2.0.0

2016-10-08 Thread Dmitry Goldenberg
Hi,

We have some code which worked with Spark 1.5.0, which allowed us to pass
system properties to the executors:

SparkConf sparkConf = new SparkConf().setAppName(appName);
...
for each property:
sparkConf.setExecutorEnv(propName, propValue);

Our current Spark Streaming driver is compiled with spark-streaming_2.11 :
2.0.0.

We no longer get the properties on the executor side.  Tried the following
approaches and none of these worked:

sparkConf.set("spark.MyPropName", "MyPropValue");
sparkConf.setExecutorEnv("MyProp1", "MyPropVal1");
sparkConf.setExecutorEnv("SPARK_JAVA_OPTS", "-Daaa=bbb");

On the executor side, I'm retrieving with System.getenv; fall back on
System.getProperty() if the property ends up being in the properties rather
than the environment.

The only way I was able to get this to work is by defining a custom Spark
conf file, with every property name being prefixed with "spark." and then
passing the conf properties file via the --properties-file  to
spark-submit.

Is anyone aware of a change that may have caused setExecutorEnv to work
differently in 2.0.0?

Is there any other way to pass environment variables or system properties
to the executor side, preferably a programmatic way rather than
configuration-wise?

Thanks,
- Dmitry


Re: Reason for Kafka topic existence check / "Does the topic exist?" error

2016-10-29 Thread Dmitry Goldenberg
Cody,

Thanks for your comments.

The way I'm reading the Kafka documentation (
https://kafka.apache.org/documentation) is that auto.create.topics.enable
is set to true by default. Right now it's not set in our server.properties
on the Kafka broker side so I would imagine that the first request to
publish a document into topic X would cause X to be created, as
auto.create.topics.enable is presumably defaulted to true.

Basically, I used to be able to start a streaming Kafka job first, without
the topic X already existing, then let the producer publish the first (and
all subsequent) documents and the consumer would get the documents from
that point.

This mode is not working anymore. Despite auto.create.topics.enable
presumably defaulting to true (?), I'm getting the "Does the topic exist"
exception.

Not a big problem but raises the question of, when would the topic be
"auto-created" if not on the first document being published to it?

It was nice when it was working because we didn't have to operationalize
topic creation. Not a big deal but now we'll have to make sure we execute
the 'create-topics' type of task or shell script at install time.

This seems like a Kafka doc issue potentially, to explain what exactly one
can expect from the auto.create.topics.enable flag.

-Dmitry


On Sat, Oct 8, 2016 at 1:26 PM, Cody Koeninger  wrote:

> So I just now retested this with 1.5.2, and 2.0.0, and the behavior is
> exactly the same across spark versions.
>
> If the topic hasn't been created, you will get that error on startup,
> because the topic doesn't exist and thus doesn't have metadata.
>
> If you have auto.create.topics.enable set to true on the broker
> config, the request will fairly quickly lead to the topic being
> created after the fact.
>
> All you have to do is hit up-arrow-enter and re-submit the spark job,
> the second time around the topic will exist.  That seems pretty low
> effort.
>
> I'd rather stick with having an early error for those of us that
> prefer to run with auto.create set to false (because it makes sure the
> topic is actually set up the way you want, reduces the likelihood of
> spurious topics being created, etc).
>
>
>
> On Sat, Oct 8, 2016 at 11:44 AM, Dmitry Goldenberg
>  wrote:
> > Hi,
> >
> > I am trying to start up a simple consumer that streams from a Kafka
> topic,
> > using Spark 2.0.0:
> >
> > spark-streaming_2.11
> > spark-streaming-kafka-0-8_2.11
> >
> > I was getting an error as below until I created the topic in Kafka. From
> > integrating Spark 1.5, I never used to hit this check; we were able to
> start
> > all of our Spark Kafka consumers, then start the producers, and have
> Kafka
> > automatically create the topics once the first message for a given topic
> was
> > published.
> >
> > Is there something I might be doing to cause this topic existence check
> in
> > KafkaCluster.scala to kick in? I'd much rather be able to not have to
> > pre-create the topics before I start the consumers.  Any
> thoughts/comments
> > would be appreciated.
> >
> > Thanks.
> > - Dmitry
> >
> > 
> >
> > Exception in thread "main" org.apache.spark.SparkException:
> > java.nio.channels.ClosedChannelException
> >
> > java.nio.channels.ClosedChannelException
> >
> > org.apache.spark.SparkException: Error getting partition metadata for
> > ''. Does the topic exist?
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> checkErrors$1.apply(KafkaCluster.scala:373)
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$
> checkErrors$1.apply(KafkaCluster.scala:373)
> >
> > at scala.util.Either.fold(Either.scala:98)
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.
> scala:372)
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.
> scala:222)
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:484)
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaUtils$.
> createDirectStream(KafkaUtils.scala:607)
> >
> > at
> > org.apache.spark.streaming.kafka.KafkaUtils.
> createDirectStream(KafkaUtils.scala)
> >
> > at
> > com.citi.totalconduct.consumer.kafka.spark.KafkaSparkStreamingDriver.
> createContext(KafkaSparkStreamingDriver.java:253)
> >
> 

subtractByKey modifes values in the source RDD

2016-11-23 Thread Dmitry Dzhus
I'm experiencing a problem with subtractByKey using Spark 2.0.2 with
Scala 2.11.x:

Relevant code:

object Types {
   type ContentId = Int
   type ContentKey = Tuple2[Int, ContentId]
   type InternalContentId = Int
}

val inverseItemIDMap: RDD[(InternalContentId, ContentKey)] =
itemIDMap.map(_.swap).cache()
logger.info(s"Built an inverse map of ${inverseItemIDMap.count()} item
IDs")
logger.info(inverseItemIDMap.collect().mkString("I->E ", "\nI->E ", ""))

val superfluousItems: RDD[(InternalContentId, Int)] = .. .cache()
logger.info(superfluousItems.collect().mkString("SI ", "\nSI ", ""))

val filteredInverseItemIDMap: RDD[(InternalContentId, ContentKey)] = 
  inverseItemIDMap.subtractByKey(superfluousItems).cache() // <<===!!!
logger.info(s"${filteredInverseItemIDMap.count()} items in the filtered
inverse ID mapping")
logger.info(filteredInverseItemIDMap.collect().mkString("F I->E ", "\nF
I->E ", ""))

The operation in question is .subtractByKey. Both RDDs involved are
cached and forced via count() prior to calling subtractByKey, so I
would expect the result to be unaffected by how exactly
superfluousItems is built.

I added debugging output and filtered the resulting logs by relevant
InternalContentId values (829911, 830071). Output:

Built an inverse map of 827354 item IDs
.
.
I->E (829911,(2,1135081))
I->E (830071,(1,2295102))
.
.
748190 items in the training set had less than 28 ratings
SI (829911,3)
.
.
79164 items in the filtered inverse ID mapping
F I->E (830071,(2,1135081))

There's no element with key 830071 in superfluousItems (SI), so it's
not removed from the source RDD. However, its value is for some reason
replaced with the one from key 829911. How could this be? I cannot
reproduce it locally - only when running on a multi-machine cluster.
Is this a bug or I'm missing something?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/subtractByKey-modifes-values-in-the-source-RDD-tp28121.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe e-mail: user-unsubscr...@spark.apache.org



NoNodeAvailableException (None of the configured nodes are available) error when trying to push data to Elastic from a Spark job

2017-02-03 Thread Dmitry Goldenberg
Hi,

Any reason why we might be getting this error?  The code seems to work fine
in the non-distributed mode but the same code when run from a Spark job is
not able to get to Elastic.

Spark version: 2.0.1 built for Hadoop 2.4, Scala 2.11
Elastic version: 2.3.1

I've verified the Elastic hosts and the cluster name.

The spot in the code where this happens is:

 ClusterHealthResponse clusterHealthResponse = client.admin().cluster()

  .prepareHealth()

  .setWaitForGreenStatus()

  .setTimeout(TimeValue.*timeValueSeconds*(10))

  .get();


Stack trace:


Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$
scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$
abortStage$1.apply(DAGScheduler.scala:1442)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$
abortStage$1.apply(DAGScheduler.scala:1441)

at scala.collection.mutable.ResizableArray$class.foreach(
ResizableArray.scala:59)

at scala.collection.mutable.ArrayBuffer.foreach(
ArrayBuffer.scala:48)

at org.apache.spark.scheduler.DAGScheduler.abortStage(
DAGScheduler.scala:1441)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$
handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at org.apache.spark.scheduler.DAGScheduler$$anonfun$
handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at scala.Option.foreach(Option.scala:257)

at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(
DAGScheduler.scala:811)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
doOnReceive(DAGScheduler.scala:1667)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
onReceive(DAGScheduler.scala:1622)

at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.
onReceive(DAGScheduler.scala:1611)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at org.apache.spark.scheduler.DAGScheduler.runJob(
DAGScheduler.scala:632)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
apply(RDD.scala:902)

at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.
apply(RDD.scala:900)

at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:151)

at org.apache.spark.rdd.RDDOperationScope$.withScope(
RDDOperationScope.scala:112)

at org.apache.spark.rdd.RDD.withScope(RDD.scala:358)

at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:900)

at org.apache.spark.api.java.JavaRDDLike$class.
foreachPartition(JavaRDDLike.scala:218)

at org.apache.spark.api.java.AbstractJavaRDDLike.
foreachPartition(JavaRDDLike.scala:45)

at com.myco.MyDriver$3.call(com.myco.MyDriver.java:214)

at com.myco.MyDriver$3.call(KafkaSparkStreamingDriver.java:201)

at org.apache.spark.streaming.api.java.JavaDStreamLike$$
anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)

at org.apache.spark.streaming.api.java.JavaDStreamLike$$
anonfun$foreachRDD$1.apply(JavaDStreamLike.scala:272)

at org.apache.spark.streaming.dstream.DStream$$anonfun$
foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)

at org.apache.spark.streaming.dstream.DStream$$anonfun$
foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:627)

at org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)

at org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)

at org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)

at org.apache.spark.streaming.dstream.DStream.
createRDDWithLocalProperties(DStream.scala:415)

at org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)

at org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1.apply(ForEachDStream.scala:50)

at org.apache.spark.streaming.dstream.ForEachDStream$$
anonfun$1.apply(ForEachDStream.scala:50)

at scala.util.Try$.apply(Try.scala:192)

at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)

at org.apache.spark.streaming.scheduler.JobScheduler$
JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)

at org.apache.spark.streaming.scheduler.JobScheduler$
JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)

at org.apache.spark.streaming.scheduler.JobScheduler$
JobHandler$$anonfun$

How to fix error "Failed to get records for..." after polling for 120000

2017-04-18 Thread Dmitry Goldenberg
Hi,

I was wondering if folks have some ideas, recommendation for how to fix
this error (full stack trace included below).

We're on Kafka 0.10.0.0 and spark_streaming_2.11 v. 2.0.0.

We've tried a few things as suggested in these sources:

   -
   
http://stackoverflow.com/questions/42264669/spark-streaming-assertion-failed-failed-to-get-records-for-spark-executor-a-gro
   - https://issues.apache.org/jira/browse/SPARK-19275
   - https://issues.apache.org/jira/browse/SPARK-17147

but still seeing the error.

We'd appreciate any clues or recommendations.
Thanks,
- Dmitry



Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 3 in stage 4227.0 failed 1 times, most recent
failure: Lost task 3.0 in stage 4227.0 (TID 33819, localhost):
java.lang.AssertionError: assertion failed: Failed to get records for
spark-executor-Group-Consumer-Group-1 Topic1 0 476289 after polling for
12

at scala.Predef$.assert(Predef.scala:170)

at
org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:74)

at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:227)

at
org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:193)

at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

at
scala.collection.convert.Wrappers$IteratorWrapper.next(Wrappers.scala:31)

at
com.myco.ProcessPartitionFunction.call(ProcessPartitionFunction.java:70)

at
com.myco.ProcessPartitionFunction.call(ProcessPartitionFunction.java:24)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:218)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:902)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)

at
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1916)

at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)

at org.apache.spark.scheduler.Task.run(Task.scala:86)

at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)

at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:745)



Driver stacktrace:

at org.apache.spark.scheduler.DAGScheduler.org
<http://org.apache.spark.scheduler.dagscheduler.org/>
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)

at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)

at scala.Option.foreach(Option.scala:257)

at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)

at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)

at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

at
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1890)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1903)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1916)

at org.apache.spark.SparkContext.runJob(SparkContext.scala:1930)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:902)

at
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:900)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)

at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperat

Re: Why do checkpoints work the way they do?

2017-09-11 Thread Dmitry Naumenko
+1 for me for this question. If there any constraints in restoring
checkpoint for Structured Streaming, they should be documented.


2017-08-31 9:20 GMT+03:00 张万新 :

> So is there any documents demonstrating in what condition can my
> application recover from the same checkpoint and in what condition not?
>
> Tathagata Das 于2017年8月30日周三 下午1:20写道:
>
>> Hello,
>>
>> This is an unfortunate design on my part when I was building DStreams :)
>>
>> Fortunately, we learnt from our mistakes and built Structured Streaming
>> the correct way. Checkpointing in Structured Streaming stores only the
>> progress information (offsets, etc.), and the user can change their
>> application code (within certain constraints, of course) and still restart
>> from checkpoints (unlike DStreams). If you are just building out your
>> streaming applications, then I highly recommend you to try out Structured
>> Streaming instead of DStreams (which is effectively in maintenance mode).
>>
>>
>> On Fri, Aug 25, 2017 at 7:41 PM, Hugo Reinwald 
>> wrote:
>>
>>> Hello,
>>>
>>> I am implementing a spark streaming solution with Kafka and read that
>>> checkpoints cannot be used across application code changes - here
>>> 
>>>
>>> I tested changes in application code and got the error message as b
>>> below -
>>>
>>> 17/08/25 15:10:47 WARN CheckpointReader: Error reading checkpoint from
>>> file file:/tmp/checkpoint/checkpoint-150364116.bk
>>> java.io.InvalidClassException: scala.collection.mutable.ArrayBuffer;
>>> local class incompatible: stream classdesc serialVersionUID =
>>> -2927962711774871866, local class serialVersionUID = 1529165946227428979
>>>
>>> While I understand that this is as per design, can I know why does
>>> checkpointing work the way that it does verifying the class signatures?
>>> Would it not be easier to let the developer decide if he/she wants to use
>>> the old checkpoints depending on what is the change in application logic
>>> e.g. changes in code unrelated to spark/kafka - Logging / conf changes etc
>>>
>>> This is first post in the group. Apologies if I am asking the question
>>> again, I did a nabble search and it didnt throw up the answer.
>>>
>>> Thanks for the help.
>>> Hugo
>>>
>>
>>


Losing system properties on executor side, if context is checkpointed

2019-02-19 Thread Dmitry Goldenberg
Hi all,

I'm seeing an odd behavior where if I switch the context from regular to
checkpointed, the system properties are no longer automatically carried
over into the worker / executors and turn out to be null there.

This is in Java, using spark-streaming_2.10, version 1.5.0.

I'm placing properties into a Properties object and pass it over into the
worker logic.  I would think I shouldn't have to do this and it works if I
just set properties into the regular context, they get automatically set
and carried over to the worker side in that case.

Is this something fixed or changed in the later versions of Spark?

This is what I ended up doing the following in the driver program:

  private JavaStreamingContext createCheckpointedContext(SparkConf sparkConf,
Parameters params) {

JavaStreamingContextFactory factory = new JavaStreamingContextFactory()
{

  @Override

  public JavaStreamingContext create() {

return createContext(sparkConf, params);

  }

};

return JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
factory);

  }


  private JavaStreamingContext createContext(SparkConf sparkConf,
Parameters params) {

// Create context with the specified batch interval, in milliseconds.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
Durations.milliseconds(params.getBatchDurationMillis()));

// Set the checkpoint directory, if we're checkpointing

if (params.isCheckpointed()) {

  jssc.checkpoint(params.getCheckpointDir());

}

Properties props = new Properties();

  JavaConverters.seqAsJavaListConverter(sparkConf
.getExecutorEnv()).asJava().stream().map(x -> props.setProperty(x._1, x._2
));


.

  // ... Create Direct Stream from Kafka...

messageBodies.foreachRDD(new Function, Void>() {

  @Override

  public Void call(JavaRDD rdd) throws Exception {

ProcessPartitionFunction func = new ProcessPartitionFunction(

  props, // <-- Had to pass that through, so this works in a
checkpointed scenario

  params.getAppName(),

  params.getTopic(),

  ..

rdd.foreachPartition(func);

return null;

  }

});

Would appreciate any recommendations/clues,
Thanks,
- Dmitry


Issues with Spark Streaming checkpointing of Kafka topic content

2019-04-02 Thread Dmitry Goldenberg
Hi,

I've got 3 questions/issues regarding checkpointing, was hoping someone
could help shed some light on this.

We've got a Spark Streaming consumer consuming data from a Kafka topic;
works fine generally until I switch it to the checkpointing mode by calling
the 'checkpoint' method on the context and pointing the checkpointing at a
directory in HDFS.

I can see that files get written to that directory however I don't see new
Kafka content being processed.

*Question 1.* Is it possible that the checkpointed consumer is off base in
its understanding of where the offsets are on the topic and how could I
troubleshoot that?  Is it possible that some "confusion" happens if a
consumer is switched back and forth between checkpointed and not? How could
we tell?

*Question 2.* About spark.streaming.receiver.writeAheadLog.enable. By
default this is false. "All the input data received through receivers will
be saved to write ahead logs that will allow it to be recovered after
driver failures."  So if we don't set this to true, what *will* get saved
into checkpointing and what data *will* be recovered upon the driver
restarting?

*Question 3.* We want the RDD's to be treated as successfully processed
only once we have done all the necessary transformations and actions on the
data.  By default, will the Spark Streaming checkpointing simply mark the
topic offsets as having been processed once the data has been received by
Spark?  Or, once the data has been processed by the driver + the workers
successfully?  If the former, how can we configure checkpointing to do the
latter?

Thanks,
- Dmitry


Re: Issues with Spark Streaming checkpointing of Kafka topic content

2019-04-02 Thread Dmitry Goldenberg
To add more info, this project is on an older version of Spark, 1.5.0, and
on an older version of Kafka which is 0.8.2.1 (2.10-0.8.2.1).

On Tue, Apr 2, 2019 at 11:39 AM Dmitry Goldenberg 
wrote:

> Hi,
>
> I've got 3 questions/issues regarding checkpointing, was hoping someone
> could help shed some light on this.
>
> We've got a Spark Streaming consumer consuming data from a Kafka topic;
> works fine generally until I switch it to the checkpointing mode by calling
> the 'checkpoint' method on the context and pointing the checkpointing at a
> directory in HDFS.
>
> I can see that files get written to that directory however I don't see new
> Kafka content being processed.
>
> *Question 1.* Is it possible that the checkpointed consumer is off base
> in its understanding of where the offsets are on the topic and how could I
> troubleshoot that?  Is it possible that some "confusion" happens if a
> consumer is switched back and forth between checkpointed and not? How could
> we tell?
>
> *Question 2.* About spark.streaming.receiver.writeAheadLog.enable. By
> default this is false. "All the input data received through receivers
> will be saved to write ahead logs that will allow it to be recovered after
> driver failures."  So if we don't set this to true, what *will* get saved
> into checkpointing and what data *will* be recovered upon the driver
> restarting?
>
> *Question 3.* We want the RDD's to be treated as successfully processed
> only once we have done all the necessary transformations and actions on the
> data.  By default, will the Spark Streaming checkpointing simply mark the
> topic offsets as having been processed once the data has been received by
> Spark?  Or, once the data has been processed by the driver + the workers
> successfully?  If the former, how can we configure checkpointing to do the
> latter?
>
> Thanks,
> - Dmitry
>


Implementing FIRST_VALUE, LEAD, LAG in Spark

2015-02-13 Thread Dmitry Tolpeko
Hello,

To convert existing Map Reduce jobs to Spark, I need to implement window
functions such as FIRST_VALUE, LEAD, LAG and so on. For example,
FIRST_VALUE function:

Source (1st column is key):

A, A1
A, A2
A, A3
B, B1
B, B2
C, C1

and the result should be

A, A1, A1
A, A2, A1
A, A3, A1
B, B1, B1
B, B2, B1
C, C1, C1

You can see that the first value in a group is repeated in each row.

My current Spark/Scala code:

def firstValue(b: Iterable[String]) : List[(String, String)] = {
  val c = scala.collection.mutable.MutableList[(String, String)]()
  var f = ""
  b.foreach(d => { if(f.isEmpty()) f = d; c += d -> f})
  c.toList
}

val data=sc.parallelize(List(
   ("A", "A1"),
   ("A", "A2"),
   ("A", "A3"),
   ("B", "B1"),
   ("B", "B2"),
   ("C", "C1")))

data.groupByKey().map{case(a, b)=>(a, firstValue(b))}.collect

So I create a new list after groupByKey. Is it right approach to do this in
Spark? Are there any other options? Please point me to any drawbacks.

Thanks,

Dmitry


Re: Implementing FIRST_VALUE, LEAD, LAG in Spark

2015-02-17 Thread Dmitry Tolpeko
I ended up with the following:

def firstValue(items: Iterable[String]) = for { i <- items
} yield (i, items.head)

data.groupByKey().map{case(a, b)=>(a, firstValue(b))}.collect

More details:
http://dmtolpeko.com/2015/02/17/first_value-last_value-lead-and-lag-in-spark/

I would appreciate any feedback.

Dmitry

On Fri, Feb 13, 2015 at 11:54 AM, Dmitry Tolpeko 
wrote:

> Hello,
>
> To convert existing Map Reduce jobs to Spark, I need to implement window
> functions such as FIRST_VALUE, LEAD, LAG and so on. For example,
> FIRST_VALUE function:
>
> Source (1st column is key):
>
> A, A1
> A, A2
> A, A3
> B, B1
> B, B2
> C, C1
>
> and the result should be
>
> A, A1, A1
> A, A2, A1
> A, A3, A1
> B, B1, B1
> B, B2, B1
> C, C1, C1
>
> You can see that the first value in a group is repeated in each row.
>
> My current Spark/Scala code:
>
> def firstValue(b: Iterable[String]) : List[(String, String)] = {
>   val c = scala.collection.mutable.MutableList[(String, String)]()
>   var f = ""
>   b.foreach(d => { if(f.isEmpty()) f = d; c += d -> f})
>   c.toList
> }
>
> val data=sc.parallelize(List(
>("A", "A1"),
>("A", "A2"),
>("A", "A3"),
>("B", "B1"),
>("B", "B2"),
>("C", "C1")))
>
> data.groupByKey().map{case(a, b)=>(a, firstValue(b))}.collect
>
> So I create a new list after groupByKey. Is it right approach to do this
> in Spark? Are there any other options? Please point me to any drawbacks.
>
> Thanks,
>
> Dmitry
>
>


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
Are you proposing I downgrade Solrj's httpclient dependency to be on par with 
that of Spark/Hadoop? Or upgrade Spark/Hadoop's httpclient to the latest?

Solrj has to stay with its selected version. I could try and rebuild Spark with 
the latest httpclient but I've no idea what effects that may cause on Spark.

Sent from my iPhone

> On Feb 18, 2015, at 1:37 AM, Arush Kharbanda  
> wrote:
> 
> Hi
> 
> Did you try to make maven pick the latest version
> 
> http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management
> 
> That way solrj won't cause any issue, you can try this and check if the part 
> of your code where you access HDFS works fine?
> 
> 
> 
>> On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg  
>> wrote:
>> I'm getting the below error when running spark-submit on my class. This class
>> has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
>> 4.10.3 from within the class.
>> 
>> This is in conflict with the older version, HttpClient 3.1 that's a
>> dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).
>> 
>> I've tried setting spark.files.userClassPathFirst to true in SparkConf in my
>> program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf as
>> 
>> spark.files.userClassPathFirst true
>> 
>> No go, I'm still getting the error, as below. Is there anything else I can
>> try? Are there any plans in Spark to support multiple class loaders?
>> 
>> Exception in thread "main" java.lang.NoSuchMethodError:
>> org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
>> at
>> org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
>> at
>> org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
>> at
>> org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
>> at
>> org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
>> at
>> org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
>> at
>> org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
>> at
>> org.apache.solr.client.solrj.impl.HttpSolrServer.(HttpSolrServer.java:168)
>> at
>> org.apache.solr.client.solrj.impl.HttpSolrServer.(HttpSolrServer.java:141)
>> ...
>> 
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 
> 
> -- 
> 
> 
> Arush Kharbanda || Technical Teamlead
> 
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com


Re: Class loading issue, spark.files.userClassPathFirst doesn't seem to be working

2015-02-18 Thread Dmitry Goldenberg
I think I'm going to have to rebuild Spark with commons.httpclient.version
set to 4.3.1 which looks to be the version chosen by Solrj, rather than the
4.2.6 that Spark's pom mentions. Might work.

On Wed, Feb 18, 2015 at 1:37 AM, Arush Kharbanda  wrote:

> Hi
>
> Did you try to make maven pick the latest version
>
>
> http://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Management
>
> That way solrj won't cause any issue, you can try this and check if the
> part of your code where you access HDFS works fine?
>
>
>
> On Wed, Feb 18, 2015 at 10:23 AM, dgoldenberg 
> wrote:
>
>> I'm getting the below error when running spark-submit on my class. This
>> class
>> has a transitive dependency on HttpClient v.4.3.1 since I'm calling SolrJ
>> 4.10.3 from within the class.
>>
>> This is in conflict with the older version, HttpClient 3.1 that's a
>> dependency of Hadoop 2.4 (I'm running Spark 1.2.1 built for Hadoop 2.4).
>>
>> I've tried setting spark.files.userClassPathFirst to true in SparkConf in
>> my
>> program, also setting it to true in  $SPARK-HOME/conf/spark-defaults.conf
>> as
>>
>> spark.files.userClassPathFirst true
>>
>> No go, I'm still getting the error, as below. Is there anything else I can
>> try? Are there any plans in Spark to support multiple class loaders?
>>
>> Exception in thread "main" java.lang.NoSuchMethodError:
>>
>> org.apache.http.impl.conn.SchemeRegistryFactory.createSystemDefault()Lorg/apache/http/conn/scheme/SchemeRegistry;
>> at
>>
>> org.apache.http.impl.client.SystemDefaultHttpClient.createClientConnectionManager(SystemDefaultHttpClient.java:121)
>> at
>>
>> org.apache.http.impl.client.AbstractHttpClient.getConnectionManager(AbstractHttpClient.java:445)
>> at
>>
>> org.apache.solr.client.solrj.impl.HttpClientUtil.setMaxConnections(HttpClientUtil.java:206)
>> at
>>
>> org.apache.solr.client.solrj.impl.HttpClientConfigurer.configure(HttpClientConfigurer.java:35)
>> at
>>
>> org.apache.solr.client.solrj.impl.HttpClientUtil.configureClient(HttpClientUtil.java:142)
>> at
>>
>> org.apache.solr.client.solrj.impl.HttpClientUtil.createClient(HttpClientUtil.java:118)
>> at
>>
>> org.apache.solr.client.solrj.impl.HttpSolrServer.(HttpSolrServer.java:168)
>> at
>>
>> org.apache.solr.client.solrj.impl.HttpSolrServer.(HttpSolrServer.java:141)
>> ...
>>
>>
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Class-loading-issue-spark-files-userClassPathFirst-doesn-t-seem-to-be-working-tp21693.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
>
> [image: Sigmoid Analytics] 
>
> *Arush Kharbanda* || Technical Teamlead
>
> ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
>


  1   2   >