Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-14 Thread Gene Pang
Hi Dmitry,

I am not familiar with all of the details you have just described, but I
think Tachyon should be able to help you.

If you store all of your resource files in HDFS or S3 or both, you can run
Tachyon to use those storage systems as the under storage (
http://tachyon-project.org/documentation/Mounting-and-Transparent-Naming.html).
Then, you can run Spark to read from Tachyon as an HDFS-compatible file
system and do your processing. If you ever need to write data back out, you
can write it out as a file back into Tachyon, which can also be reflected
into your original store (HDFS, S3). The caching in Tachyon will be done
on-demand and you can use the LRU policy for eviction (or a customized
policy
http://tachyon-project.org/documentation/Tiered-Storage-on-Tachyon.html). I
think this might sound like your option #4. I hope this helps!

Thanks,
Gene

On Thu, Jan 14, 2016 at 4:54 AM, Dmitry Goldenberg  wrote:

> OK so it looks like Tachyon is a cluster memory plugin marked as
> "experimental" in Spark.
>
> In any case, we've got a few requirements for the system we're working on
> which may drive the decision for how to implement large resource file
> management.
>
> The system is a framework of N data analyzers which take incoming
> documents as input and transform them or extract some data out of those
> documents.  These analyzers can be chained together which makes it a great
> case for processing with RDD's and a set of map/filter types of Spark
> functions. There's already an established framework API which we want to
> preserve.  This means that most likely, we'll create a relatively thin
> "binding" layer for exposing these analyzers as well-documented functions
> to the end-users who want to use them in a Spark based distributed
> computing environment.
>
> We also want to, ideally, hide the complexity of how these resources are
> loaded from the end-users who will be writing the actual Spark jobs that
> utilize the Spark "binding" functions that we provide.
>
> So, for managing large numbers of small, medium, or large resource files,
> we're considering the below options, with a variety of pros and cons
> attached, from the following perspectives:
>
> a) persistence - where do the resources reside initially;
> b) loading - what are the mechanics for loading of these resources;
> c) caching and sharing across worker nodes.
>
> Possible options:
>
> 1. Load each resource into a broadcast variable. Considering that we have
> scores if not hundreds of these resource files, maintaining that many
> broadcast variables seems like a complexity that's going to be hard to
> manage. We'd also need a translation layer between the broadcast variables
> and the internal API that would want to "speak" InputStream's rather than
> broadcast variables.
>
> 2. Load resources into RDD's and perform join's against them from our
> incoming document data RDD's, thus achieving the effect of a value lookup
> from the resources.  While this seems like a very Spark'y way of doing
> things, the lookup mechanics seem quite non-trivial, especially because
> some of the resources aren't going to be pure dictionaries; they may be
> statistical models.  Additionally, this forces us to utilize Spark's
> semantics for handling of these resources which means a potential rewrite
> of our internal product API. That would be a hard option to go with.
>
> 3. Pre-install all the needed resources on each of the worker nodes;
> retrieve the needed resources from the file system and load them into
> memory as needed. Ideally, the resources would only be installed once, on
> the Spark driver side; we'd want to avoid having to pre-install all these
> files on each node. However, we've done this as an exercise and this
> approach works OK.
>
> 4. Pre-load all the resources into HDFS or S3 i.e. into some distributed
> persistent store; load them into cluster memory from there, as necessary.
> Presumably this could be a pluggable store with a common API exposed.
> Since our framework is an OEM'able product, we could plug and play with a
> variety of such persistent stores via Java's FileSystem/URL scheme handler
> API's.
>
> 5. Implement a Resource management server, with a RESTful interface on
> top. Under the covers, this could be a wrapper on top of #4.  Potentially
> unnecessary if we have a solid persistent store API as per #4.
>
> 6. Beyond persistence, caching also has to be considered for these
> resources. We've considered Tachyon (especially since it's pluggable into
> Spark), Redis, and the like. Ideally, I would think we'd want resources to
> be loaded into the cluster memory as needed; paged in/out on-demand in an
> LRU fashion.  From this perspective, it's not yet clear to me what the best
> option(s) would be. Any thoughts / recommendations would be appreciated.
>
>
>
>
>
> On Tue, Jan 12, 2016 at 3:04 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Gene.
>>
>> Does Spark 

Re: Spark and HBase RDD join/get

2016-01-14 Thread Ted Yu
For #1, yes it is possible.

You can find some example in hbase-spark module of hbase where hbase as
DataSource is provided.
e.g.

https://github.com/apache/hbase/blob/master/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala

Cheers

On Thu, Jan 14, 2016 at 5:04 AM, Kristoffer Sjögren 
wrote:

> Hi
>
> We have a RDD that needs to be mapped with information from
> HBase, where the exact key is the user id.
>
> What's the different alternatives for doing this?
>
> - Is it possible to do HBase.get() requests from a map function in Spark?
> - Or should we join RDDs with all full HBase table scan?
>
> I ask because full table scans feels inefficient, especially if the
> input RDD is really small compared to the full table. But I
> realize that a full table scan may not be what happens in reality?
>
> Cheers,
> -Kristoffer
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


RE: Random Forest FeatureImportance throwing NullPointerException

2016-01-14 Thread Rachana Srivastava
Tried using 1.6 version of Spark that takes numberOfFeatures fifth argument in  
the API but still getting featureImportance as null.

RandomForestClassifier rfc = getRandomForestClassifier( numTrees,  maxBinSize,  
maxTreeDepth,  seed,  impurity);
RandomForestClassificationModel rfm = 
RandomForestClassificationModel.fromOld(model, rfc, categoricalFeatures, 
numberOfClasses,numberOfFeatures);
System.out.println(rfm.featureImportances());

Stack Trace:
Exception in thread "main" java.lang.NullPointerException
at 
org.apache.spark.ml.tree.impl.RandomForest$.computeFeatureImportance(RandomForest.scala:1152)
at 
org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:)
at 
org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:1108)
at 
scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at 
scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at 
org.apache.spark.ml.tree.impl.RandomForest$.featureImportances(RandomForest.scala:1108)
at 
org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances$lzycompute(RandomForestClassifier.scala:237)
at 
org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances(RandomForestClassifier.scala:237)
at 
com.markmonitor.antifraud.ce.ml.CheckFeatureImportance.main(CheckFeatureImportance.java:49)

From: Rachana Srivastava
Sent: Wednesday, January 13, 2016 3:30 PM
To: 'user@spark.apache.org'; 'd...@spark.apache.org'
Subject: Random Forest FeatureImportance throwing NullPointerException

I have a Random forest model for which I am trying to get the featureImportance 
vector.

Map categoricalFeaturesParam = new HashMap<>();
scala.collection.immutable.Map categoricalFeatures =  
(scala.collection.immutable.Map)
scala.collection.immutable.Map$.MODULE$.apply(JavaConversions.mapAsScalaMap(categoricalFeaturesParam).toSeq());
int numberOfClasses =2;
RandomForestClassifier rfc = new RandomForestClassifier();
RandomForestClassificationModel rfm = 
RandomForestClassificationModel.fromOld(model, rfc, categoricalFeatures, 
numberOfClasses);
System.out.println(rfm.featureImportances());

When I run above code I found featureImportance as null.  Do I need to set 
anything in specific to get the feature importance for the random forest model.

Thanks,

Rachana


Re: SparkContext SyntaxError: invalid syntax

2016-01-14 Thread Andrew Weiner
Hi Bryan,

I ran "$> python --version" on every node on the cluster, and it is Python
2.7.8 for every single one.

When I try to submit the Python example in client mode
* ./bin/spark-submit  --master yarn --deploy-mode client
--driver-memory 4g --executor-memory 2g --executor-cores 1
./examples/src/main/python/pi.py 10*
That's when I get this error that I mentioned:

16/01/14 10:09:10 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0
(TID 0, mundonovo-priv): org.apache.spark.SparkException:
Error from python worker:
  python: module pyspark.daemon not found
PYTHONPATH was:

/scratch5/hadoop/yarn/local/usercache//filecache/48/spark-assembly-1.6.0-hadoop2.4.0.jar:/home/aqualab/spark-1.6.0-bin-hadoop2.4/python:/home/jpr123/hg.pacific/python-common:/home/jp
r123/python-libs:/home/jpr123/lib/python2.7/site-packages:/home/zsb739/local/lib/python2.7/site-packages:/home/jpr123/mobile-cdn-analysis:/home//lib/python2.7/site-packages:/home//code/libs:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/pyspark.zip:/scratch5/hadoop/yarn/local/usercache//appcache/application_1450370639491_0187/container_1450370639491_0187_01_02/py4j-0.9-src.zip
java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at
org.apache.spark.api.python.PythonWorkerFactory.startDaemon(PythonWorkerFactory.scala:164)
at []

followed by several more similar errors that also say:
Error from python worker:
  python: module pyspark.daemon not found


Even though the default python appeared to be correct, I just went ahead
and explicitly set PYSPARK_PYTHON in conf/spark-env.sh to the path of the
default python binary executable.  After making this change I was able to
run the job successfully in client!  That is, this appeared to fix the
"pyspark.daemon not found" error when running in client mode.

However, when running in cluster mode, I am still getting the same syntax
error:

Traceback (most recent call last):
  File "pi.py", line 24, in ?
from pyspark import SparkContext
  File "/home//spark-1.6.0-bin-hadoop2.4/python/pyspark/__init__.py",
line 61
indent = ' ' * (min(len(m) for m in indents) if indents else 0)
  ^
SyntaxError: invalid syntax

Is it possible that the PYSPARK_PYTHON environment variable is ignored
when jobs are submitted in cluster mode?  It seems that Spark or Yarn
is going behind my back, so to speak, and using some older version of
python I didn't even know was installed.

Thanks again for all your help thus far.  We are getting close

Andrew



On Wed, Jan 13, 2016 at 6:13 PM, Bryan Cutler  wrote:

> Hi Andrew,
>
> There are a couple of things to check.  First, is Python 2.7 the default
> version on all nodes in the cluster or is it an alternate install? Meaning
> what is the output of this command "$>  python --version"  If it is an
> alternate install, you could set the environment variable "PYSPARK_PYTHON"
> Python binary executable to use for PySpark in both driver and workers
> (default is python).
>
> Did you try to submit the Python example under client mode?  Otherwise,
> the command looks fine, you don't use the --class option for submitting
> python files
> * ./bin/spark-submit  --master yarn --deploy-mode client
> --driver-memory 4g --executor-memory 2g --executor-cores 1
> ./examples/src/main/python/pi.py 10*
>
> That is a good sign that local jobs and Java examples work, probably just
> a small configuration issue :)
>
> Bryan
>
> On Wed, Jan 13, 2016 at 3:51 PM, Andrew Weiner <
> andrewweiner2...@u.northwestern.edu> wrote:
>
>> Thanks for your continuing help.  Here is some additional info.
>>
>> *OS/architecture*
>> output of *cat /proc/version*:
>> Linux version 2.6.18-400.1.1.el5 (mockbu...@x86-012.build.bos.redhat.com)
>>
>> output of *lsb_release -a*:
>> LSB Version:
>>  
>> :core-4.0-amd64:core-4.0-ia32:core-4.0-noarch:graphics-4.0-amd64:graphics-4.0-ia32:graphics-4.0-noarch:printing-4.0-amd64:printing-4.0-ia32:printing-4.0-noarch
>> Distributor ID: RedHatEnterpriseServer
>> Description:Red Hat Enterprise Linux Server release 5.11 (Tikanga)
>> Release:5.11
>> Codename:   Tikanga
>>
>> *Running a local job*
>> I have confirmed that I can successfully run python jobs using
>> bin/spark-submit --master local[*]
>> Specifically, this is the command I am using:
>> *./bin/spark-submit --master local[8]
>> ./examples/src/main/python/wordcount.py
>> file:/home//spark-1.6.0-bin-hadoop2.4/README.md*
>> And it works!
>>
>> *Additional info*
>> I am also able to successfully run the Java SparkPi example using yarn in
>> cluster mode using this command:
>> * ./bin/spark-submit --class org.apache.spark.examples.SparkPi
>> --master yarn --deploy-mode cluster --driver-memory 4g
>> --executor-memory 2g --executor-cores 1 lib/spark-examples*.jar

Re: [discuss] dropping Hadoop 2.2 and 2.3 support in Spark 2.0?

2016-01-14 Thread Sean Owen
I personally support this. I had suggest drawing the line at Hadoop
2.6, but that's minor. More info:

Hadoop 2.7: April 2015
Hadoop 2.6: Nov 2014
Hadoop 2.5: Aug 2014
Hadoop 2.4: April 2014
Hadoop 2.3: Feb 2014
Hadoop 2.2: Oct 2013

CDH 5.0/5.1 = Hadoop 2.3 + backports
CDH 5.2/5.3 = Hadoop 2.5 + backports
CDH 5.4+ = Hadoop 2.6 + chunks of 2.7 + backports.

I can only imagine that CDH6 this year will be based on something
later still like 2.8 (no idea about the 3.0 schedule). In the sense
that 5.2 was released about a year and half ago, yes, this vendor has
moved on from 2.3 a while ago. These releases will also never contain
a different minor Spark release. For example 5.7 will have Spark 1.6,
I believe, and not 2.0.

Here, I listed some additional things we could clean up in Spark if
Hadoop 2.6 was assumed. By itself, not a lot:
https://github.com/apache/spark/pull/10446#issuecomment-167971026

Yes, we also get less Jenkins complexity. Mostly, the jar-hell that's
biting now gets a little more feasible to fix. And we get Hadoop fixes
as well as new APIs, which helps mostly for YARN.

My general position is that backwards-compatibility and supporting
older platforms needs to be a low priority in a major release; it's a
decision about what to support for users in the next couple years, not
the preceding couple years. Users on older technologies simply stay on
the older Spark until ready to update; they are in no sense suddenly
left behind otherwise.

On Thu, Jan 14, 2016 at 6:29 AM, Reynold Xin  wrote:
> We've dropped Hadoop 1.x support in Spark 2.0.
>
> There is also a proposal to drop Hadoop 2.2 and 2.3, i.e. the minimal Hadoop
> version we support would be Hadoop 2.4. The main advantage is then we'd be
> able to focus our Jenkins resources (and the associated maintenance of
> Jenkins) to create builds for Hadoop 2.6/2.7. It is my understanding that
> all Hadoop vendors have moved away from 2.2/2.3, but there might be some
> users that are on these older versions.
>
> What do you think about this idea?
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Read Accumulator value while running

2016-01-14 Thread Mennour Rostom
Hi Daniel, Andrew

Thank you for your answers, So it s not possible to read the accumulator
value until the action that manipulate it finishes. it's bad, I ll think to
something else. However the main important thing in my application is the
ability to lunche 2 (or more) actions in parallel and concurrently :  "
*within* each Spark application, multiple “jobs” (Spark actions) may be
running concurrently if they were submitted by different threads" [Job
Scheduling Official].

my actions have to run on the same RDD : eg.

RDD D;
D.ac1(func1) [foreach fo instance]  //  D.ac2(func2) [foreachePartition or
whatever]


can this be done by Asynchronous Actions ? (it is not working for me on a
single node :/ )
can I use a broadcast (same) variable in the two partions ? if yes, what
happens if I change the value of the broadcast variable ?

in the other hand, i would realy like to know more about Spark 2.0

Thank you,
Regards

2016-01-13 20:31 GMT+01:00 Andrew Or :

> Hi Kira,
>
> As you suspected, accumulator values are only updated after the task
> completes. We do send accumulator updates from the executors to the driver
> on periodic heartbeats, but these only concern internal accumulators, not
> the ones created by the user.
>
> In short, I'm afraid there is not currently a way (in Spark 1.6 and
> before) to access the accumulator values until after the tasks that updated
> them have completed. This will change in Spark 2.0, the next version,
> however.
>
> Please let me know if you have more questions.
> -Andrew
>
> 2016-01-13 11:24 GMT-08:00 Daniel Imberman :
>
>> Hi Kira,
>>
>> I'm having some trouble understanding your question. Could you please
>> give a code example?
>>
>>
>>
>> From what I think you're asking there are two issues with what you're
>> looking to do. (Please keep in mind I could be totally wrong on both of
>> these assumptions, but this is what I've been lead to believe)
>>
>> 1. The contract of an accumulator is that you can't actually read the
>> value as the function is performing because the values in the accumulator
>> don't actually mean anything until they are reduced. If you were looking
>> for progress in a local context, you could do mapPartitions and have a
>> local accumulator per partition, but I don't think it's possible to get the
>> actual accumulator value in the middle of the map job.
>>
>> 2. As far as performing ac2 while ac1 is "always running", I'm pretty
>> sure that's not possible. The way that lazy valuation works in Spark, the
>> transformations have to be done serially. Having it any other way would
>> actually be really bad because then you could have ac1 changing the data
>> thereby making ac2's output unpredictable.
>>
>> That being said, with a more specific example it might be possible to
>> help figure out a solution that accomplishes what you are trying to do.
>>
>> On Wed, Jan 13, 2016 at 5:43 AM Kira  wrote:
>>
>>> Hi,
>>>
>>> So i have an action on one RDD that is relatively long, let's call it
>>> ac1;
>>> what i want to do is to execute another action (ac2) on the same RDD to
>>> see
>>> the evolution of the first one (ac1); for this end i want to use an
>>> accumulator and read it's value progressively to see the changes on it
>>> (on
>>> the fly) while ac1 is always running. My problem is that the accumulator
>>> is
>>> only updated once the ac1 has been finished, this is not helpful for me
>>> :/ .
>>>
>>> I ve seen  here
>>> <
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Asynchronous-Broadcast-from-driver-to-workers-is-it-possible-td15758.html
>>> >
>>> what may seem like a solution for me but it doesn t work : "While Spark
>>> already offers support for asynchronous reduce (collect data from
>>> workers,
>>> while not interrupting execution of a parallel transformation) through
>>> accumulator"
>>>
>>> Another post suggested to use SparkListner to do that.
>>>
>>> are these solutions correct ? if yes, give me a simple exemple ?
>>> are there other solutions ?
>>>
>>> thank you.
>>> Regards
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Read-Accumulator-value-while-running-tp25960.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>


Spark and HBase RDD join/get

2016-01-14 Thread Kristoffer Sjögren
Hi

We have a RDD that needs to be mapped with information from
HBase, where the exact key is the user id.

What's the different alternatives for doing this?

- Is it possible to do HBase.get() requests from a map function in Spark?
- Or should we join RDDs with all full HBase table scan?

I ask because full table scans feels inefficient, especially if the
input RDD is really small compared to the full table. But I
realize that a full table scan may not be what happens in reality?

Cheers,
-Kristoffer

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-14 Thread Dmitry Goldenberg
OK so it looks like Tachyon is a cluster memory plugin marked as
"experimental" in Spark.

In any case, we've got a few requirements for the system we're working on
which may drive the decision for how to implement large resource file
management.

The system is a framework of N data analyzers which take incoming documents
as input and transform them or extract some data out of those documents.
These analyzers can be chained together which makes it a great case for
processing with RDD's and a set of map/filter types of Spark functions.
There's already an established framework API which we want to preserve.
This means that most likely, we'll create a relatively thin "binding" layer
for exposing these analyzers as well-documented functions to the end-users
who want to use them in a Spark based distributed computing environment.

We also want to, ideally, hide the complexity of how these resources are
loaded from the end-users who will be writing the actual Spark jobs that
utilize the Spark "binding" functions that we provide.

So, for managing large numbers of small, medium, or large resource files,
we're considering the below options, with a variety of pros and cons
attached, from the following perspectives:

a) persistence - where do the resources reside initially;
b) loading - what are the mechanics for loading of these resources;
c) caching and sharing across worker nodes.

Possible options:

1. Load each resource into a broadcast variable. Considering that we have
scores if not hundreds of these resource files, maintaining that many
broadcast variables seems like a complexity that's going to be hard to
manage. We'd also need a translation layer between the broadcast variables
and the internal API that would want to "speak" InputStream's rather than
broadcast variables.

2. Load resources into RDD's and perform join's against them from our
incoming document data RDD's, thus achieving the effect of a value lookup
from the resources.  While this seems like a very Spark'y way of doing
things, the lookup mechanics seem quite non-trivial, especially because
some of the resources aren't going to be pure dictionaries; they may be
statistical models.  Additionally, this forces us to utilize Spark's
semantics for handling of these resources which means a potential rewrite
of our internal product API. That would be a hard option to go with.

3. Pre-install all the needed resources on each of the worker nodes;
retrieve the needed resources from the file system and load them into
memory as needed. Ideally, the resources would only be installed once, on
the Spark driver side; we'd want to avoid having to pre-install all these
files on each node. However, we've done this as an exercise and this
approach works OK.

4. Pre-load all the resources into HDFS or S3 i.e. into some distributed
persistent store; load them into cluster memory from there, as necessary.
Presumably this could be a pluggable store with a common API exposed.
Since our framework is an OEM'able product, we could plug and play with a
variety of such persistent stores via Java's FileSystem/URL scheme handler
API's.

5. Implement a Resource management server, with a RESTful interface on top.
Under the covers, this could be a wrapper on top of #4.  Potentially
unnecessary if we have a solid persistent store API as per #4.

6. Beyond persistence, caching also has to be considered for these
resources. We've considered Tachyon (especially since it's pluggable into
Spark), Redis, and the like. Ideally, I would think we'd want resources to
be loaded into the cluster memory as needed; paged in/out on-demand in an
LRU fashion.  From this perspective, it's not yet clear to me what the best
option(s) would be. Any thoughts / recommendations would be appreciated.





On Tue, Jan 12, 2016 at 3:04 PM, Dmitry Goldenberg  wrote:

> Thanks, Gene.
>
> Does Spark use Tachyon under the covers anyway for implementing its
> "cluster memory" support?
>
> It seems that the practice I hear the most about is the idea of loading
> resources as RDD's and then doing join's against them to achieve the lookup
> effect.
>
> The other approach would be to load the resources into broadcast variables
> but I've heard concerns about memory.  Could we run out of memory if we
> load too much into broadcast vars?  Is there any memory_to_disk/spill to
> disk capability for broadcast variables in Spark?
>
>
> On Tue, Jan 12, 2016 at 11:19 AM, Gene Pang  wrote:
>
>> Hi Dmitry,
>>
>> Yes, Tachyon can help with your use case. You can read and write to
>> Tachyon via the filesystem api (
>> http://tachyon-project.org/documentation/File-System-API.html). There is
>> a native Java API as well as a Hadoop-compatible API. Spark is also able to
>> interact with Tachyon via the Hadoop-compatible API, so Spark jobs can read
>> input files from Tachyon and write output files to Tachyon.
>>
>> I hope that helps,
>> Gene
>>
>> On Tue, Jan 12, 2016 at 4:26 AM, 

Spark SQL . How to enlarge output rows ?

2016-01-14 Thread Eli Super
Hi

After executing sql

sqlContext.sql("select day_time from my_table limit 10").show()

my output looks like  :

++
|  day_time|
++
|2015/12/15 15:52:...|
|2015/12/15 15:53:...|
|2015/12/15 15:52:...|
|2015/12/15 15:52:...|
|2015/12/15 15:52:...|
|2015/12/15 15:52:...|
|2015/12/15 15:51:...|
|2015/12/15 15:52:...|
|2015/12/15 15:52:...|
|2015/12/15 15:53:...|
++


I'd like to get full rows

Thanks !


NPE when using Joda DateTime

2016-01-14 Thread Spencer, Alex (Santander)
Hello,

I was wondering if somebody is able to help me get to the bottom of a null 
pointer exception I'm seeing in my code. I've managed to narrow down a problem 
in a larger class to my use of Joda's DateTime functions. I've successfully run 
my code in scala, but I've hit a few problems when adapting it to run in spark.

Spark version: 1.3.0
Scala version: 2.10.4
Java HotSpot 1.7

I have a small case class called Transaction, which looks something like this:

case class Transaction(date : org.joda.time.DateTime = new 
org.joda.time.DateTime())

I have an RDD[Transactions] trans:
org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at 
:44

I am able to run this successfully:

val test = trans.map(_.date.minusYears(10))
test.take(1)

However if I do:

val groupedTrans = trans.groupBy(_.account)

//For each group, process transactions in turn:
val test = groupedTrans.flatMap { case (_, transList) =>
  transList.map {transaction =>
transaction.date.minusYears(10)
  }
}
test.take(1)

I get:

java.lang.NullPointerException
at org.joda.time.DateTime.minusYears(DateTime.java:1268)

Should the second operation not be equivalent to the first .map one? (It's a 
long way round of producing my error - but it's extremely similar to what's 
happening in my class).

I've got a custom registration class for Kryo which I think is working - before 
I added this the original .map did not work - but shouldn't it be able to 
serialize all instances of Joda DateTime?

Thank you for any help / pointers you can give me.

Kind Regards,
Alex.

Alex Spencer

Emails aren't always secure, and they may be intercepted or changed after
they've been sent. Santander doesn't accept liability if this happens. If you
think someone may have interfered with this email, please get in touch with the
sender another way. This message doesn't create or change any contract.
Santander doesn't accept responsibility for damage caused by any viruses
contained in this email or its attachments. Emails may be monitored. If you've
received this email by mistake, please let the sender know at once that it's
gone to the wrong person and then destroy it without copying, using, or telling
anyone about its contents.
Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc Reg.
No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 3AN.
Registered in England. www.santander.co.uk. Authorised by the Prudential
Regulation Authority and regulated by the Financial Conduct Authority and the
Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. No.
02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA Reg.
No. 154210. You can check this on the Financial Services Register by visiting
the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
6768. Santander UK plc is also licensed by the Financial Supervision Commission
of the Isle of Man for its branch in the Isle of Man. Deposits held with the
Isle of Man branch are covered by the Isle of Man Depositors’ Compensation
Scheme as set out in the Isle of Man Depositors’ Compensation Scheme Regulations
2010. In the Isle of Man, Santander UK plc’s principal place of business is at
19/21 Prospect Hill, Douglas, Isle of Man, IM1 1ET. Santander and the flame logo
are registered trademarks.
Santander Asset Finance plc. Reg. No. 1533123. Registered Office: 2 Triton
Square, Regent’s Place, London NW1 3AN. Registered in England. Santander
Corporate & Commercial is a brand name used by Santander UK plc, Abbey National
Treasury Services plc and Santander Asset Finance plc.
Ref:[PDB#1-4A]
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Re: Best practices for sharing/maintaining large resource files for Spark jobs

2016-01-14 Thread Dmitry Goldenberg
The other thing from some folks' recommendations on this list was Apache
Ignite.  Their In-Memory File System (
https://ignite.apache.org/features/igfs.html) looks quite interesting.

On Thu, Jan 14, 2016 at 7:54 AM, Dmitry Goldenberg  wrote:

> OK so it looks like Tachyon is a cluster memory plugin marked as
> "experimental" in Spark.
>
> In any case, we've got a few requirements for the system we're working on
> which may drive the decision for how to implement large resource file
> management.
>
> The system is a framework of N data analyzers which take incoming
> documents as input and transform them or extract some data out of those
> documents.  These analyzers can be chained together which makes it a great
> case for processing with RDD's and a set of map/filter types of Spark
> functions. There's already an established framework API which we want to
> preserve.  This means that most likely, we'll create a relatively thin
> "binding" layer for exposing these analyzers as well-documented functions
> to the end-users who want to use them in a Spark based distributed
> computing environment.
>
> We also want to, ideally, hide the complexity of how these resources are
> loaded from the end-users who will be writing the actual Spark jobs that
> utilize the Spark "binding" functions that we provide.
>
> So, for managing large numbers of small, medium, or large resource files,
> we're considering the below options, with a variety of pros and cons
> attached, from the following perspectives:
>
> a) persistence - where do the resources reside initially;
> b) loading - what are the mechanics for loading of these resources;
> c) caching and sharing across worker nodes.
>
> Possible options:
>
> 1. Load each resource into a broadcast variable. Considering that we have
> scores if not hundreds of these resource files, maintaining that many
> broadcast variables seems like a complexity that's going to be hard to
> manage. We'd also need a translation layer between the broadcast variables
> and the internal API that would want to "speak" InputStream's rather than
> broadcast variables.
>
> 2. Load resources into RDD's and perform join's against them from our
> incoming document data RDD's, thus achieving the effect of a value lookup
> from the resources.  While this seems like a very Spark'y way of doing
> things, the lookup mechanics seem quite non-trivial, especially because
> some of the resources aren't going to be pure dictionaries; they may be
> statistical models.  Additionally, this forces us to utilize Spark's
> semantics for handling of these resources which means a potential rewrite
> of our internal product API. That would be a hard option to go with.
>
> 3. Pre-install all the needed resources on each of the worker nodes;
> retrieve the needed resources from the file system and load them into
> memory as needed. Ideally, the resources would only be installed once, on
> the Spark driver side; we'd want to avoid having to pre-install all these
> files on each node. However, we've done this as an exercise and this
> approach works OK.
>
> 4. Pre-load all the resources into HDFS or S3 i.e. into some distributed
> persistent store; load them into cluster memory from there, as necessary.
> Presumably this could be a pluggable store with a common API exposed.
> Since our framework is an OEM'able product, we could plug and play with a
> variety of such persistent stores via Java's FileSystem/URL scheme handler
> API's.
>
> 5. Implement a Resource management server, with a RESTful interface on
> top. Under the covers, this could be a wrapper on top of #4.  Potentially
> unnecessary if we have a solid persistent store API as per #4.
>
> 6. Beyond persistence, caching also has to be considered for these
> resources. We've considered Tachyon (especially since it's pluggable into
> Spark), Redis, and the like. Ideally, I would think we'd want resources to
> be loaded into the cluster memory as needed; paged in/out on-demand in an
> LRU fashion.  From this perspective, it's not yet clear to me what the best
> option(s) would be. Any thoughts / recommendations would be appreciated.
>
>
>
>
>
> On Tue, Jan 12, 2016 at 3:04 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> Thanks, Gene.
>>
>> Does Spark use Tachyon under the covers anyway for implementing its
>> "cluster memory" support?
>>
>> It seems that the practice I hear the most about is the idea of loading
>> resources as RDD's and then doing join's against them to achieve the lookup
>> effect.
>>
>> The other approach would be to load the resources into broadcast
>> variables but I've heard concerns about memory.  Could we run out of memory
>> if we load too much into broadcast vars?  Is there any memory_to_disk/spill
>> to disk capability for broadcast variables in Spark?
>>
>>
>> On Tue, Jan 12, 2016 at 11:19 AM, Gene Pang  wrote:
>>
>>> Hi Dmitry,
>>>
>>> Yes, Tachyon can help with your use 

code hangs in local master mode

2016-01-14 Thread Kai Wei
Hi list,

I ran into an issue which I think could be a bug.

I have a Hive table stored as parquet files. Let's say it's called
testtable. I found the code below stuck forever in spark-shell with a local
master or driver/executor:
sqlContext.sql("select * from testtable").rdd.cache.zipWithIndex().count

But it works if I use a standalone master.

I also tried several different variants:
don't cache the rdd(works):
sqlContext.sql("select * from testtable").rdd.zipWithIndex().count

cache the rdd after zipWithIndex(works):
sqlContext.sql("select * from testtable").rdd.zipWithIndex().cache.count

use parquet file reader(doesn't work):
sqlContext.read.parquet("hdfs://localhost:8020/user/hive/warehouse/testtable").rdd.cache.zipWithIndex().count

use parquet files on local file system(works)
sqlContext.read.parquet("/tmp/testtable").rdd.cache.zipWithIndex().count

I read the code of zipWithIndex() and the DAG visualization. I think the
function cause the Spark firstly retrieve n-1 partitions of target table
and cache them, then the last partition. It must be something wrong when
the driver/executor tries to read the last parition from HDFS .

I am using spark-1.5.2-bin-hadoop-2.6 on cloudera quickstart vm 5.4.2.

-- 
Kai Wei
Big Data Developer

Pythian - love your data

w...@pythian.com
Tel: +1 613 565 8696 x1579
Mobile: +61 403 572 456

-- 


--





RE: NPE when using Joda DateTime

2016-01-14 Thread Spencer, Alex (Santander)
Hi,

I tried take(1500) and test.collect and these both work on the "single" map 
statement. 

I'm very new to Kryo serialisation, I managed to find some code and I copied 
and pasted and that's what originally made the single map statement work:

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.joda.time.DateTime])
  }
}

Is it because the groupBy sees a different class type? Maybe Array[DateTime]? I 
don’t want to find the answer by trial and error though.

Alex

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com] 
Sent: 14 January 2016 14:07
To: Spencer, Alex (Santander)
Cc: user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

It does look somehow like the state of the DateTime object isn't being 
recreated properly on deserialization somehow, given where the NPE occurs (look 
at the Joda source code). However the object is java.io.Serializable. Are you 
sure the Kryo serialization is correct?

It doesn't quite explain why the map operation works by itself. It could be the 
difference between executing locally (take(1) will look at 1 partition in 1 
task which prefers to be local) and executing remotely (groupBy is going to 
need a shuffle).

On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) 
 wrote:
> Hello,
>
>
>
> I was wondering if somebody is able to help me get to the bottom of a 
> null pointer exception I’m seeing in my code. I’ve managed to narrow 
> down a problem in a larger class to my use of Joda’s DateTime 
> functions. I’ve successfully run my code in scala, but I’ve hit a few 
> problems when adapting it to run in spark.
>
>
>
> Spark version: 1.3.0
>
> Scala version: 2.10.4
>
> Java HotSpot 1.7
>
>
>
> I have a small case class called Transaction, which looks something 
> like
> this:
>
>
>
> case class Transaction(date : org.joda.time.DateTime = new
> org.joda.time.DateTime())
>
>
>
> I have an RDD[Transactions] trans:
>
> org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> :44
>
>
>
> I am able to run this successfully:
>
>
>
> val test = trans.map(_.date.minusYears(10))
>
> test.take(1)
>
>
>
> However if I do:
>
>
>
> val groupedTrans = trans.groupBy(_.account)
>
>
>
> //For each group, process transactions in turn:
>
> val test = groupedTrans.flatMap { case (_, transList) =>
>
>   transList.map {transaction =>
>
> transaction.date.minusYears(10)
>
>   }
>
> }
>
> test.take(1)
>
>
>
> I get:
>
>
>
> java.lang.NullPointerException
>
> at org.joda.time.DateTime.minusYears(DateTime.java:1268)
>
>
>
> Should the second operation not be equivalent to the first .map one? 
> (It’s a long way round of producing my error – but it’s extremely 
> similar to what’s happening in my class).
>
>
>
> I’ve got a custom registration class for Kryo which I think is working 
> - before I added this the original .map did not work – but shouldn’t 
> it be able to serialize all instances of Joda DateTime?
>
>
>
> Thank you for any help / pointers you can give me.
>
>
>
> Kind Regards,
>
> Alex.
>
>
>
> Alex Spencer
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org

Emails aren't always secure, and they may be intercepted or changed after
they've been sent. Santander doesn't accept liability if this happens. If you
think someone may have interfered with this email, please get in touch with the
sender another way. This message doesn't create or change any contract.
Santander doesn't accept responsibility for damage caused by any viruses
contained in this email or its attachments. Emails may be monitored. If you've
received this email by mistake, please let the sender know at once that it's
gone to the wrong person and then destroy it without copying, using, or telling
anyone about its contents.
Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc Reg.
No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 3AN.
Registered in England. www.santander.co.uk. Authorised by the Prudential
Regulation Authority and regulated by the Financial Conduct Authority and the
Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. No.
02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA Reg.
No. 154210. You can check this on the Financial Services Register by visiting
the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
6768. Santander UK plc is also licensed by the Financial Supervision Commission
of the Isle of Man for its branch in the Isle of Man. Deposits held with the
Isle of Man branch are covered by the Isle of Man Depositors’ 

RE: Spark SQL . How to enlarge output rows ?

2016-01-14 Thread Spencer, Alex (Santander)
Hi,

Try …..show(false)

public void show(int numRows,
boolean truncate)


Kind Regards,
Alex.

From: Eli Super [mailto:eli.su...@gmail.com]
Sent: 14 January 2016 13:09
To: user@spark.apache.org
Subject: Spark SQL . How to enlarge output rows ?


Hi

After executing sql

sqlContext.sql("select day_time from my_table limit 10").show()

my output looks like  :

++

|  day_time|

++

|2015/12/15 15:52:...|

|2015/12/15 15:53:...|

|2015/12/15 15:52:...|

|2015/12/15 15:52:...|

|2015/12/15 15:52:...|

|2015/12/15 15:52:...|

|2015/12/15 15:51:...|

|2015/12/15 15:52:...|

|2015/12/15 15:52:...|

|2015/12/15 15:53:...|

++



I'd like to get full rows

Thanks !
Emails aren't always secure, and they may be intercepted or changed after
they've been sent. Santander doesn't accept liability if this happens. If you
think someone may have interfered with this email, please get in touch with the
sender another way. This message doesn't create or change any contract.
Santander doesn't accept responsibility for damage caused by any viruses
contained in this email or its attachments. Emails may be monitored. If you've
received this email by mistake, please let the sender know at once that it's
gone to the wrong person and then destroy it without copying, using, or telling
anyone about its contents.
Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc Reg.
No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 3AN.
Registered in England. www.santander.co.uk. Authorised by the Prudential
Regulation Authority and regulated by the Financial Conduct Authority and the
Prudential Regulation Authority. FCA Reg. No. 106054 and 146003 respectively.
Santander Sharedealing is a trading name of Abbey Stockbrokers Limited Reg. No.
02666793. Registered Office: Kingfisher House, Radford Way, Billericay, Essex
CM12 0GZ. Authorised and regulated by the Financial Conduct Authority. FCA Reg.
No. 154210. You can check this on the Financial Services Register by visiting
the FCA’s website www.fca.org.uk/register or by contacting the FCA on 0800 111
6768. Santander UK plc is also licensed by the Financial Supervision Commission
of the Isle of Man for its branch in the Isle of Man. Deposits held with the
Isle of Man branch are covered by the Isle of Man Depositors’ Compensation
Scheme as set out in the Isle of Man Depositors’ Compensation Scheme Regulations
2010. In the Isle of Man, Santander UK plc’s principal place of business is at
19/21 Prospect Hill, Douglas, Isle of Man, IM1 1ET. Santander and the flame logo
are registered trademarks.
Santander Asset Finance plc. Reg. No. 1533123. Registered Office: 2 Triton
Square, Regent’s Place, London NW1 3AN. Registered in England. Santander
Corporate & Commercial is a brand name used by Santander UK plc, Abbey National
Treasury Services plc and Santander Asset Finance plc.
Ref:[PDB#1-4A]


Re: NPE when using Joda DateTime

2016-01-14 Thread Sean Owen
It does look somehow like the state of the DateTime object isn't being
recreated properly on deserialization somehow, given where the NPE
occurs (look at the Joda source code). However the object is
java.io.Serializable. Are you sure the Kryo serialization is correct?

It doesn't quite explain why the map operation works by itself. It
could be the difference between executing locally (take(1) will look
at 1 partition in 1 task which prefers to be local) and executing
remotely (groupBy is going to need a shuffle).

On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander)
 wrote:
> Hello,
>
>
>
> I was wondering if somebody is able to help me get to the bottom of a null
> pointer exception I’m seeing in my code. I’ve managed to narrow down a
> problem in a larger class to my use of Joda’s DateTime functions. I’ve
> successfully run my code in scala, but I’ve hit a few problems when adapting
> it to run in spark.
>
>
>
> Spark version: 1.3.0
>
> Scala version: 2.10.4
>
> Java HotSpot 1.7
>
>
>
> I have a small case class called Transaction, which looks something like
> this:
>
>
>
> case class Transaction(date : org.joda.time.DateTime = new
> org.joda.time.DateTime())
>
>
>
> I have an RDD[Transactions] trans:
>
> org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> :44
>
>
>
> I am able to run this successfully:
>
>
>
> val test = trans.map(_.date.minusYears(10))
>
> test.take(1)
>
>
>
> However if I do:
>
>
>
> val groupedTrans = trans.groupBy(_.account)
>
>
>
> //For each group, process transactions in turn:
>
> val test = groupedTrans.flatMap { case (_, transList) =>
>
>   transList.map {transaction =>
>
> transaction.date.minusYears(10)
>
>   }
>
> }
>
> test.take(1)
>
>
>
> I get:
>
>
>
> java.lang.NullPointerException
>
> at org.joda.time.DateTime.minusYears(DateTime.java:1268)
>
>
>
> Should the second operation not be equivalent to the first .map one? (It’s a
> long way round of producing my error – but it’s extremely similar to what’s
> happening in my class).
>
>
>
> I’ve got a custom registration class for Kryo which I think is working -
> before I added this the original .map did not work – but shouldn’t it be
> able to serialize all instances of Joda DateTime?
>
>
>
> Thank you for any help / pointers you can give me.
>
>
>
> Kind Regards,
>
> Alex.
>
>
>
> Alex Spencer
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark and HBase RDD join/get

2016-01-14 Thread Kristoffer Sjögren
Thanks Ted!

On Thu, Jan 14, 2016 at 4:49 PM, Ted Yu  wrote:
> For #1, yes it is possible.
>
> You can find some example in hbase-spark module of hbase where hbase as
> DataSource is provided.
> e.g.
>
> https://github.com/apache/hbase/blob/master/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseRDDFunctions.scala
>
> Cheers
>
> On Thu, Jan 14, 2016 at 5:04 AM, Kristoffer Sjögren 
> wrote:
>>
>> Hi
>>
>> We have a RDD that needs to be mapped with information from
>> HBase, where the exact key is the user id.
>>
>> What's the different alternatives for doing this?
>>
>> - Is it possible to do HBase.get() requests from a map function in Spark?
>> - Or should we join RDDs with all full HBase table scan?
>>
>> I ask because full table scans feels inefficient, especially if the
>> input RDD is really small compared to the full table. But I
>> realize that a full table scan may not be what happens in reality?
>>
>> Cheers,
>> -Kristoffer
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: NPE when using Joda DateTime

2016-01-14 Thread Sean Owen
That's right, though it's possible the default way Kryo chooses to
serialize the object doesn't work. I'd debug a little more and print
out as much as you can about the DateTime object at the point it
appears to not work. I think there's a real problem and it only
happens to not turn up for the map + take(1) for reasons below.

Sandy I know you work with DateTime for spark-timeseries; does this ring a bell?

On Thu, Jan 14, 2016 at 2:28 PM, Spencer, Alex (Santander)
 wrote:
> Hi,
>
> I tried take(1500) and test.collect and these both work on the "single" map 
> statement.
>
> I'm very new to Kryo serialisation, I managed to find some code and I copied 
> and pasted and that's what originally made the single map statement work:
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime])
>   }
> }
>
> Is it because the groupBy sees a different class type? Maybe Array[DateTime]? 
> I don’t want to find the answer by trial and error though.
>
> Alex
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 14 January 2016 14:07
> To: Spencer, Alex (Santander)
> Cc: user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
>
> It does look somehow like the state of the DateTime object isn't being 
> recreated properly on deserialization somehow, given where the NPE occurs 
> (look at the Joda source code). However the object is java.io.Serializable. 
> Are you sure the Kryo serialization is correct?
>
> It doesn't quite explain why the map operation works by itself. It could be 
> the difference between executing locally (take(1) will look at 1 partition in 
> 1 task which prefers to be local) and executing remotely (groupBy is going to 
> need a shuffle).
>
> On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) 
>  wrote:
>> Hello,
>>
>>
>>
>> I was wondering if somebody is able to help me get to the bottom of a
>> null pointer exception I’m seeing in my code. I’ve managed to narrow
>> down a problem in a larger class to my use of Joda’s DateTime
>> functions. I’ve successfully run my code in scala, but I’ve hit a few
>> problems when adapting it to run in spark.
>>
>>
>>
>> Spark version: 1.3.0
>>
>> Scala version: 2.10.4
>>
>> Java HotSpot 1.7
>>
>>
>>
>> I have a small case class called Transaction, which looks something
>> like
>> this:
>>
>>
>>
>> case class Transaction(date : org.joda.time.DateTime = new
>> org.joda.time.DateTime())
>>
>>
>>
>> I have an RDD[Transactions] trans:
>>
>> org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
>> :44
>>
>>
>>
>> I am able to run this successfully:
>>
>>
>>
>> val test = trans.map(_.date.minusYears(10))
>>
>> test.take(1)
>>
>>
>>
>> However if I do:
>>
>>
>>
>> val groupedTrans = trans.groupBy(_.account)
>>
>>
>>
>> //For each group, process transactions in turn:
>>
>> val test = groupedTrans.flatMap { case (_, transList) =>
>>
>>   transList.map {transaction =>
>>
>> transaction.date.minusYears(10)
>>
>>   }
>>
>> }
>>
>> test.take(1)
>>
>>
>>
>> I get:
>>
>>
>>
>> java.lang.NullPointerException
>>
>> at org.joda.time.DateTime.minusYears(DateTime.java:1268)
>>
>>
>>
>> Should the second operation not be equivalent to the first .map one?
>> (It’s a long way round of producing my error – but it’s extremely
>> similar to what’s happening in my class).
>>
>>
>>
>> I’ve got a custom registration class for Kryo which I think is working
>> - before I added this the original .map did not work – but shouldn’t
>> it be able to serialize all instances of Joda DateTime?
>>
>>
>>
>> Thank you for any help / pointers you can give me.
>>
>>
>>
>> Kind Regards,
>>
>> Alex.
>>
>>
>>
>> Alex Spencer
>>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
> commands, e-mail: user-h...@spark.apache.org
>
> Emails aren't always secure, and they may be intercepted or changed after
> they've been sent. Santander doesn't accept liability if this happens. If you
> think someone may have interfered with this email, please get in touch with 
> the
> sender another way. This message doesn't create or change any contract.
> Santander doesn't accept responsibility for damage caused by any viruses
> contained in this email or its attachments. Emails may be monitored. If you've
> received this email by mistake, please let the sender know at once that it's
> gone to the wrong person and then destroy it without copying, using, or 
> telling
> anyone about its contents.
> Santander UK plc Reg. No. 2294747 and Abbey National Treasury Services plc 
> Reg.
> No. 2338548 Registered Offices: 2 Triton Square, Regent's Place, London NW1 
> 3AN.
> Registered in England. www.santander.co.uk. Authorised by the Prudential
> Regulation Authority and regulated by the Financial 

RE: NPE when using Joda DateTime

2016-01-14 Thread Spencer, Alex (Santander)
I appreciate this – thank you.

I’m not an admin on the box I’m using spark-shell on – so I’m not sure I can 
add them to that namespace. I’m hoping if I declare the JodaDateTimeSerializer 
class in my REPL that I can still get this to work. I think the INTERVAL part 
below may be key, I haven’t tried that yet.

Kind Regards,
Alex.

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: 14 January 2016 16:28
To: Spencer, Alex (Santander)
Cc: Sean Owen; user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

I had a similar problem a while back and leveraged these Kryo serializers, 
https://github.com/magro/kryo-serializers.  I had to fallback to version 0.28, 
but that was a while back.  You can add these to the
org.apache.spark.serializer.KryoRegistrator
and then set your registrator in the spark config:
sparkConfig.
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
...

where YourKryoRegistrator is something like:
class YourKryoRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.joda.time.DateTime], new JodaDateTimeSerializer)
kryo.register(classOf[org.joda.time.Interval], new JodaIntervalSerializer)
  }
}
HTH.
-Todd

On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) 
>
 wrote:
Hi,

I tried take(1500) and test.collect and these both work on the "single" map 
statement.

I'm very new to Kryo serialisation, I managed to find some code and I copied 
and pasted and that's what originally made the single map statement work:

class MyRegistrator extends KryoRegistrator {
  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.joda.time.DateTime])
  }
}

Is it because the groupBy sees a different class type? Maybe Array[DateTime]? I 
don’t want to find the answer by trial and error though.

Alex

-Original Message-
From: Sean Owen [mailto:so...@cloudera.com]
Sent: 14 January 2016 14:07
To: Spencer, Alex (Santander)
Cc: user@spark.apache.org
Subject: Re: NPE when using Joda DateTime

It does look somehow like the state of the DateTime object isn't being 
recreated properly on deserialization somehow, given where the NPE occurs (look 
at the Joda source code). However the object is java.io.Serializable. Are you 
sure the Kryo serialization is correct?

It doesn't quite explain why the map operation works by itself. It could be the 
difference between executing locally (take(1) will look at 1 partition in 1 
task which prefers to be local) and executing remotely (groupBy is going to 
need a shuffle).

On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) 
>
 wrote:
> Hello,
>
>
>
> I was wondering if somebody is able to help me get to the bottom of a
> null pointer exception I’m seeing in my code. I’ve managed to narrow
> down a problem in a larger class to my use of Joda’s DateTime
> functions. I’ve successfully run my code in scala, but I’ve hit a few
> problems when adapting it to run in spark.
>
>
>
> Spark version: 1.3.0
>
> Scala version: 2.10.4
>
> Java HotSpot 1.7
>
>
>
> I have a small case class called Transaction, which looks something
> like
> this:
>
>
>
> case class Transaction(date : org.joda.time.DateTime = new
> org.joda.time.DateTime())
>
>
>
> I have an RDD[Transactions] trans:
>
> org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> :44
>
>
>
> I am able to run this successfully:
>
>
>
> val test = trans.map(_.date.minusYears(10))
>
> test.take(1)
>
>
>
> However if I do:
>
>
>
> val groupedTrans = trans.groupBy(_.account)
>
>
>
> //For each group, process transactions in turn:
>
> val test = groupedTrans.flatMap { case (_, transList) =>
>
>   transList.map {transaction =>
>
> transaction.date.minusYears(10)
>
>   }
>
> }
>
> test.take(1)
>
>
>
> I get:
>
>
>
> java.lang.NullPointerException
>
> at org.joda.time.DateTime.minusYears(DateTime.java:1268)
>
>
>
> Should the second operation not be equivalent to the first .map one?
> (It’s a long way round of producing my error – but it’s extremely
> similar to what’s happening in my class).
>
>
>
> I’ve got a custom registration class for Kryo which I think is working
> - before I added this the original .map did not work – but shouldn’t
> it be able to serialize all instances of Joda DateTime?
>
>
>
> Thank you for any help / pointers you can give me.
>
>
>
> Kind Regards,
>
> Alex.
>
>
>
> Alex Spencer
>

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org For 
additional commands, e-mail: 
user-h...@spark.apache.org

Emails aren't 

Re: DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-14 Thread Jerry Lam
Hi Arkadiusz,

the partitionBy is not designed to have many distinct value the last time I
used it. If you search in the mailing list, I think there are couple of
people also face similar issues. For example, in my case, it won't work
over a million distinct user ids. It will require a lot of memory and very
long time to read the table back.

Best Regards,

Jerry

On Thu, Jan 14, 2016 at 2:31 PM, Arkadiusz Bicz 
wrote:

> Hi
>
> What is the proper configuration for saving parquet partition with
> large number of repeated keys?
>
> On bellow code I load 500 milion rows of data and partition it on
> column with not so many different values.
>
> Using spark-shell with 30g per executor and driver and 3 executor cores
>
>
> sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")
>
>
> Job failed because not enough memory in executor :
>
> WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
> YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> 16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
> datanode2.babar.poc: Container killed by YARN for exceeding memory
> limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
> spark.yarn.executor.memoryOverhead.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you change MEMORY_ONLY_SER to MEMORY_AND_DISK_SER_2 and see if this
still happens? It may be because you don't have enough memory to cache the
events.

On Thu, Jan 14, 2016 at 4:06 PM, Lin Zhao  wrote:

> Hi,
>
> I'm testing spark streaming with actor receiver. The actor keeps calling
> store() to save a pair to Spark.
>
> Once the job is launched, on the UI everything looks good. Millions of
> events gets through every batch. However, I added logging to the first step
> and found that only 20 or 40 events in a batch actually gets to the first
> mapper. Any idea what might be causing this?
>
> I also have log in the custom receiver before "store()" call and it's
> really calling this function millions of times.
>
> The receiver definition looks like:
>
> val stream = ssc.actorStream[(String, 
> Message)](MessageRetriever.props("message-retriever",
>   mrSections.head, conf, flowControlDef, None, None), "Martini",
>   StorageLevel.MEMORY_ONLY_SER)
>
>
> The job looks like:
>
> stream.map { pair =>
> logger.info(s"before pipeline key=${pair._1}") // Only a handful gets 
> logged although there are over 1 million in a batch
> pair._2
> }.flatMap { m =>
>   // Event Builder
>   logger.info(s"event builder thread-id=${Thread.currentThread().getId} 
> user=${m.fields.getOrElse('user, "NA")}")
>   ebHelper(m)
> }.map { e =>
>   // Event Normalizer
>   logger.info(s"normalizer thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   DefaultEventNormalizer.normalizeFields(e)
> }.map { e =>
>   logger.info(s"resolver thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   resolver(e)
> }.flatMap { e =>
>   // Event Discarder
>   logger.info(s"discarder thread-id=${Thread.currentThread().getId} 
> user=${e.getFieldAsString('user)}")
>   discarder(e)
> }.map { e =>
>   ep(e)
> }
>
>


Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Lin Zhao
Hi Shixiong,

I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. 
Batch duration is 20 seconds.

Some logs seemingly related to block manager:


16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored 
as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] 
message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored 
as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39

From: "Shixiong(Ryan) Zhu" 
>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao >
Cc: user >
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

MEMORY_AND_DISK_SER_2


Undestanding Spark Rebalancing

2016-01-14 Thread Pedro Rodriguez
Hi All,

I am running a Spark program where one of my parts is using Spark as a
scheduler rather than a data management framework. That is, my job can be
described as RDD[String] where the string describes an operation to perform
which may be cheap or expensive (process an object which may have a small
or large amount of records associated with it).

Leaving things to default, I have bad job balancing. I am wondering which
approach I should take:
1. Write a partitioner which uses partitionBy to ahead of time balance
partitions by number of records each string needs
2. repartition to have many small partitions (I have ~1700 strings acting
as jobs to run, so maybe 1-5 per partition). My question here is, does
Spark re-schedule/steal jobs if there are executors/worker processes that
aren't doing any work?

The second one would be easier and since I am not shuffling much data
around would work just fine for me, but I can't seem to find out for sure
if Spark does job re-scheduling/stealing.

Thanks
-- 
Pedro Rodriguez
PhD Student in Distributed Machine Learning | CU Boulder
UC Berkeley AMPLab Alumni

ski.rodrig...@gmail.com | pedrorodriguez.io | 909-353-4423
Github: github.com/EntilZha | LinkedIn:
https://www.linkedin.com/in/pedrorodriguezscience


Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Lin Zhao
Hi Shixiong,

Just figured it out. I was doing a .print() as output operation, which seems to 
stop the batch once it has 10 through. I changed it to a no-op foreachRDD and 
it works.

Thanks for jumping in to help me.

From: "Shixiong(Ryan) Zhu" 
>
Date: Thursday, January 14, 2016 at 4:41 PM
To: Lin Zhao >
Cc: user >
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

Could you post the codes of MessageRetriever? And by the way, could you post 
the screenshot of tasks for a batch and check the input size of these tasks? 
Considering there are so many events, there should be a lot of blocks as well 
as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao 
> wrote:
Hi Shixiong,

I tried this but it still happens. If it helps, it's 1.6.0 and runs on YARN. 
Batch duration is 20 seconds.

Some logs seemingly related to block manager:


16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817873000
16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 stored 
as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
input-0-1452817879000
16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read [win] 
message file(s) for 2015-12-17T21:00:00.000." module=TIMESPAN_HDFS_READER
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
module=MESSAGE_RETRIEVER
16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 stored 
as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39

From: "Shixiong(Ryan) Zhu" 
>
Date: Thursday, January 14, 2016 at 4:13 PM
To: Lin Zhao >
Cc: user >
Subject: Re: Spark Streaming: custom actor receiver losing vast majority of data

MEMORY_AND_DISK_SER_2



Re: Random Forest FeatureImportance throwing NullPointerException

2016-01-14 Thread Bryan Cutler
If you are able to just train the RandomForestClassificationModel from ML
directly instead of training the old model and converting, then that would
be the way to go.

On Thu, Jan 14, 2016 at 2:21 PM, 
wrote:

> Thanks so much Bryan for your response.  Is there any workaround?
>
>
>
> *From:* Bryan Cutler [mailto:cutl...@gmail.com]
> *Sent:* Thursday, January 14, 2016 2:19 PM
> *To:* Rachana Srivastava
> *Cc:* user@spark.apache.org; d...@spark.apache.org
> *Subject:* Re: Random Forest FeatureImportance throwing
> NullPointerException
>
>
>
> Hi Rachana,
>
> I got the same exception.  It is because computing the feature importance
> depends on impurity stats, which is not calculated with the old
> RandomForestModel in MLlib.  Feel free to create a JIRA for this if you
> think it is necessary, otherwise I believe this problem will be eventually
> solved as part of this JIRA
> https://issues.apache.org/jira/browse/SPARK-12183
>
> Bryan
>
>
>
> On Thu, Jan 14, 2016 at 8:12 AM, Rachana Srivastava <
> rachana.srivast...@markmonitor.com> wrote:
>
> Tried using 1.6 version of Spark that takes numberOfFeatures fifth
> argument in  the API but still getting featureImportance as null.
>
>
>
> RandomForestClassifier rfc = *getRandomForestClassifier*( numTrees,
> maxBinSize,  maxTreeDepth,  seed,  impurity);
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses,
> numberOfFeatures);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> Stack Trace:
>
> Exception in thread "main" *java.lang.NullPointerException*
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.computeFeatureImportance(RandomForest.scala:1152)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:1108)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.featureImportances(RandomForest.scala:1108)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances$lzycompute(RandomForestClassifier.scala:237)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances(RandomForestClassifier.scala:237)
>
> at
> com.markmonitor.antifraud.ce.ml.CheckFeatureImportance.main(
> *CheckFeatureImportance.java:49*)
>
>
>
> *From:* Rachana Srivastava
> *Sent:* Wednesday, January 13, 2016 3:30 PM
> *To:* 'user@spark.apache.org'; 'd...@spark.apache.org'
> *Subject:* Random Forest FeatureImportance throwing NullPointerException
>
>
>
> I have a Random forest model for which I am trying to get the
> featureImportance vector.
>
>
>
> Map categoricalFeaturesParam = *new* HashMap<>();
>
> scala.collection.immutable.Map categoricalFeatures =
>  (scala.collection.immutable.Map)
>
> scala.collection.immutable.Map$.*MODULE$*.apply(JavaConversions.
> *mapAsScalaMap*(categoricalFeaturesParam).toSeq());
>
> *int* numberOfClasses =2;
>
> RandomForestClassifier rfc = *new* RandomForestClassifier();
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> When I run above code I found featureImportance as null.  Do I need to set
> anything in specific to get the feature importance for the random forest
> model.
>
>
>
> Thanks,
>
>
>
> Rachana
>
>
>


How to bind webui to localhost?

2016-01-14 Thread Zee Chen
Hi, what is the easiest way to configure the Spark webui to bind to
localhost or 127.0.0.1? I intend to use this with ssh socks proxy to
provide a rudimentary "secured access". Unlike hadoop config options,
Spark doesn't allow the user to directly specify the ip addr to bind
services to.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to bind webui to localhost?

2016-01-14 Thread Shixiong(Ryan) Zhu
Yeah, it's hard code as "0.0.0.0". Could you send a PR to add a
configuration for it?

On Thu, Jan 14, 2016 at 2:51 PM, Zee Chen  wrote:

> Hi, what is the easiest way to configure the Spark webui to bind to
> localhost or 127.0.0.1? I intend to use this with ssh socks proxy to
> provide a rudimentary "secured access". Unlike hadoop config options,
> Spark doesn't allow the user to directly specify the ip addr to bind
> services to.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Sending large objects to specific RDDs

2016-01-14 Thread Daniel Imberman
Hi Ted,

So unfortunately after looking into the cluster manager that I will be
using for my testing (I'm using a super-computer called XSEDE rather than
AWS), it looks like the cluster does not actually come with Hbase installed
(this cluster is becoming somewhat problematic, as it is essentially AWS
but you have to do your own virtualization scripts). Do you have any other
thoughts on how I could go about dealing with this purely using spark and
HDFS?

Thank you

On Wed, Jan 13, 2016 at 11:49 AM Daniel Imberman 
wrote:

> Thank you Ted! That sounds like it would probably be the most efficient
> (with the least overhead) way of handling this situation.
>
> On Wed, Jan 13, 2016 at 11:36 AM Ted Yu  wrote:
>
>> Another approach is to store the objects in NoSQL store such as HBase.
>>
>> Looking up object should be very fast.
>>
>> Cheers
>>
>> On Wed, Jan 13, 2016 at 11:29 AM, Daniel Imberman <
>> daniel.imber...@gmail.com> wrote:
>>
>>> I'm looking for a way to send structures to pre-determined partitions so
>>> that
>>> they can be used by another RDD in a mapPartition.
>>>
>>> Essentially I'm given and RDD of SparseVectors and an RDD of inverted
>>> indexes. The inverted index objects are quite large.
>>>
>>> My hope is to do a MapPartitions within the RDD of vectors where I can
>>> compare each vector to the inverted index. The issue is that I only NEED
>>> one
>>> inverted index object per partition (which would have the same key as the
>>> values within that partition).
>>>
>>>
>>> val vectors:RDD[(Int, SparseVector)]
>>>
>>> val invertedIndexes:RDD[(Int, InvIndex)] =
>>> a.reduceByKey(generateInvertedIndex)
>>> vectors:RDD.mapPartitions{
>>> iter =>
>>>  val invIndex = invertedIndexes(samePartitionKey)
>>>  iter.map(invIndex.calculateSimilarity(_))
>>>  )
>>> }
>>>
>>> How could I go about setting up the Partition such that the specific data
>>> structure I need will be present for the mapPartition but I won't have
>>> the
>>> extra overhead of sending over all values (which would happen if I were
>>> to
>>> make a broadcast variable).
>>>
>>> One thought I have been having is to store the objects in HDFS but I'm
>>> not
>>> sure if that would be a suboptimal solution (It seems like it could slow
>>> down the process a lot)
>>>
>>> Another thought I am currently exploring is whether there is some way I
>>> can
>>> create a custom Partition or Partitioner that could hold the data
>>> structure
>>> (Although that might get too complicated and become problematic)
>>>
>>> Any thoughts on how I could attack this issue would be highly
>>> appreciated.
>>>
>>> thank you for your help!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Sending-large-objects-to-specific-RDDs-tp25967.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>


Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Lin Zhao
Hi,

I'm testing spark streaming with actor receiver. The actor keeps calling 
store() to save a pair to Spark.

Once the job is launched, on the UI everything looks good. Millions of events 
gets through every batch. However, I added logging to the first step and found 
that only 20 or 40 events in a batch actually gets to the first mapper. Any 
idea what might be causing this?

I also have log in the custom receiver before "store()" call and it's really 
calling this function millions of times.

The receiver definition looks like:


val stream = ssc.actorStream[(String, 
Message)](MessageRetriever.props("message-retriever",
  mrSections.head, conf, flowControlDef, None, None), "Martini",
  StorageLevel.MEMORY_ONLY_SER)


The job looks like:

stream.map { pair =>
logger.info(s"before pipeline key=${pair._1}") // Only a handful gets 
logged although there are over 1 million in a batch
pair._2
}.flatMap { m =>
  // Event Builder
  logger.info(s"event builder thread-id=${Thread.currentThread().getId} 
user=${m.fields.getOrElse('user, "NA")}")
  ebHelper(m)
}.map { e =>
  // Event Normalizer
  logger.info(s"normalizer thread-id=${Thread.currentThread().getId} 
user=${e.getFieldAsString('user)}")
  DefaultEventNormalizer.normalizeFields(e)
}.map { e =>
  logger.info(s"resolver thread-id=${Thread.currentThread().getId} 
user=${e.getFieldAsString('user)}")
  resolver(e)
}.flatMap { e =>
  // Event Discarder
  logger.info(s"discarder thread-id=${Thread.currentThread().getId} 
user=${e.getFieldAsString('user)}")
  discarder(e)
}.map { e =>
  ep(e)
}


Re: How to bind webui to localhost?

2016-01-14 Thread Zee Chen
sure will do.

On Thu, Jan 14, 2016 at 3:19 PM, Shixiong(Ryan) Zhu
 wrote:
> Yeah, it's hard code as "0.0.0.0". Could you send a PR to add a
> configuration for it?
>
> On Thu, Jan 14, 2016 at 2:51 PM, Zee Chen  wrote:
>>
>> Hi, what is the easiest way to configure the Spark webui to bind to
>> localhost or 127.0.0.1? I intend to use this with ssh socks proxy to
>> provide a rudimentary "secured access". Unlike hadoop config options,
>> Spark doesn't allow the user to directly specify the ip addr to bind
>> services to.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming: custom actor receiver losing vast majority of data

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you post the codes of MessageRetriever? And by the way, could you
post the screenshot of tasks for a batch and check the input size of these
tasks? Considering there are so many events, there should be a lot of
blocks as well as a lot of tasks.

On Thu, Jan 14, 2016 at 4:34 PM, Lin Zhao  wrote:

> Hi Shixiong,
>
> I tried this but it still happens. If it helps, it's 1.6.0 and runs on
> YARN. Batch duration is 20 seconds.
>
> Some logs seemingly related to block manager:
>
> 16/01/15 00:31:25 INFO receiver.BlockGenerator: Pushed block 
> input-0-1452817873000
> 16/01/15 00:31:27 INFO storage.MemoryStore: Block input-0-1452817879000 
> stored as bytes in memory (estimated size 60.1 MB, free 1563.4 MB)
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 31
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 30
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 29
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 28
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 27
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 26
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 25
> 16/01/15 00:31:28 INFO receiver.BlockGenerator: Pushed block 
> input-0-1452817879000
> 16/01/15 00:31:28 INFO context.TimeSpanHDFSReader: descr="Waiting to read 
> [win] message file(s) for 2015-12-17T21:00:00.000." 
> module=TIMESPAN_HDFS_READER
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 38
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 37
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 36
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 35
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 34
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 33
> 16/01/15 00:31:28 INFO storage.BlockManager: Removing RDD 32
> 16/01/15 00:31:29 INFO context.MessageRetriever: descr="Processed 4,636,461 
> lines (80 s); 0 event/s (42,636 incoming msg/s) at 2015-12-17T21" 
> module=MESSAGE_RETRIEVER
> 16/01/15 00:31:30 INFO storage.MemoryStore: Block input-0-1452817879200 
> stored as bytes in memory (estimated size 93.3 MB, free 924.9 MB)
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 45
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 44
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 43
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 42
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 41
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 40
> 16/01/15 00:31:31 INFO storage.BlockManager: Removing RDD 39
>
>
> From: "Shixiong(Ryan) Zhu" 
> Date: Thursday, January 14, 2016 at 4:13 PM
> To: Lin Zhao 
> Cc: user 
> Subject: Re: Spark Streaming: custom actor receiver losing vast majority
> of data
>
> MEMORY_AND_DISK_SER_2
>


Re: Random Forest FeatureImportance throwing NullPointerException

2016-01-14 Thread Bryan Cutler
Hi Rachana,

I got the same exception.  It is because computing the feature importance
depends on impurity stats, which is not calculated with the old
RandomForestModel in MLlib.  Feel free to create a JIRA for this if you
think it is necessary, otherwise I believe this problem will be eventually
solved as part of this JIRA
https://issues.apache.org/jira/browse/SPARK-12183

Bryan

On Thu, Jan 14, 2016 at 8:12 AM, Rachana Srivastava <
rachana.srivast...@markmonitor.com> wrote:

> Tried using 1.6 version of Spark that takes numberOfFeatures fifth
> argument in  the API but still getting featureImportance as null.
>
>
>
> RandomForestClassifier rfc = *getRandomForestClassifier*( numTrees,
> maxBinSize,  maxTreeDepth,  seed,  impurity);
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses,
> numberOfFeatures);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> Stack Trace:
>
> Exception in thread "main" *java.lang.NullPointerException*
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.computeFeatureImportance(RandomForest.scala:1152)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$$anonfun$featureImportances$1.apply(RandomForest.scala:1108)
>
> at
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>
> at
> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
>
> at
> org.apache.spark.ml.tree.impl.RandomForest$.featureImportances(RandomForest.scala:1108)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances$lzycompute(RandomForestClassifier.scala:237)
>
> at
> org.apache.spark.ml.classification.RandomForestClassificationModel.featureImportances(RandomForestClassifier.scala:237)
>
> at
> com.markmonitor.antifraud.ce.ml.CheckFeatureImportance.main(
> *CheckFeatureImportance.java:49*)
>
>
>
> *From:* Rachana Srivastava
> *Sent:* Wednesday, January 13, 2016 3:30 PM
> *To:* 'user@spark.apache.org'; 'd...@spark.apache.org'
> *Subject:* Random Forest FeatureImportance throwing NullPointerException
>
>
>
> I have a Random forest model for which I am trying to get the
> featureImportance vector.
>
>
>
> Map categoricalFeaturesParam = *new* HashMap<>();
>
> scala.collection.immutable.Map categoricalFeatures =
>  (scala.collection.immutable.Map)
>
> scala.collection.immutable.Map$.*MODULE$*.apply(JavaConversions.
> *mapAsScalaMap*(categoricalFeaturesParam).toSeq());
>
> *int* numberOfClasses =2;
>
> RandomForestClassifier rfc = *new* RandomForestClassifier();
>
> RandomForestClassificationModel rfm = RandomForestClassificationModel.
> *fromOld*(model, rfc, categoricalFeatures, numberOfClasses);
>
> System.*out*.println(rfm.featureImportances());
>
>
>
> When I run above code I found featureImportance as null.  Do I need to set
> anything in specific to get the feature importance for the random forest
> model.
>
>
>
> Thanks,
>
>
>
> Rachana
>


Using JDBC clients with "Spark on Hive"

2016-01-14 Thread sdevashis
Hello Experts,

I am getting started with Hive with Spark as the query engine. I built the
package from sources. I am able to invoke Hive CLI and run queries and see
in Ambari that Spark application are being created confirming hive is using
Spark as the engine.

However other than Hive CLI, I am not able to run queries from any other
clients that use the JDBC to connect to hive through thrift. I tried
Squirrel, Aginity Netezza workbench, and even Hue.

No yarn applications are getting created, the query times out after
sometime. Nothing gets into /tmp/user/hive.log Am I missing something? 

Again I am using Hive on Spark and not spark SQL.

Version Info:
Spark 1.4.1 built for Hadoop 2.4


Thank you in advance for any pointers.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Using-JDBC-clients-with-Spark-on-Hive-tp25976.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UDF problem (with re to types)

2016-01-14 Thread Michael Armbrust
We automatically convert types for UDFs defined in Scala, but we can't do
it in Java because the types are erased by the compiler.  If you want to
use double you should cast before calling the UDF.

On Wed, Jan 13, 2016 at 8:10 PM, Raghu Ganti  wrote:

> So, when I try BigDecimal, it works. But, should it not parse based on
> what the UDF defines? Am I missing something here?
>
> On Wed, Jan 13, 2016 at 4:57 PM, Ted Yu  wrote:
>
>> Please take a look
>> at 
>> sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
>> which shows a UserDefinedAggregateFunction that works on DoubleType column.
>>
>> sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
>> shows how it is registered.
>>
>> Cheers
>>
>> On Wed, Jan 13, 2016 at 11:58 AM, raghukiran 
>> wrote:
>>
>>> While registering and using SQL UDFs, I am running into the following
>>> problem:
>>>
>>> UDF registered:
>>>
>>> ctx.udf().register("Test", new UDF1() {
>>> /**
>>>  *
>>>  */
>>> private static final long serialVersionUID =
>>> -8231917155671435931L;
>>>
>>> public String call(Double x) throws Exception {
>>> return "testing";
>>> }
>>> }, DataTypes.StringType);
>>>
>>> Usage:
>>> query = "SELECT Test(82.4)";
>>> result = sqlCtx.sql(query).first();
>>> System.out.println(result.toString());
>>>
>>> Problem: Class Cast exception thrown
>>> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
>>> cast
>>> to java.lang.Double
>>>
>>> This problem occurs with Spark v1.5.2 and 1.6.0.
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


Re: strange behavior in spark yarn-client mode

2016-01-14 Thread Marcelo Vanzin
On Thu, Jan 14, 2016 at 10:17 AM, Sanjeev Verma
 wrote:
> now it spawn a single executors with 1060M size, I am not able to understand
> why this time it executes executors with 1G+overhead not 2G what I
> specified.

Where are you looking for the memory size for the container?

-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UDF problem (with re to types)

2016-01-14 Thread Raghu Ganti
Would this go away if the Spark source was compiled against Java 1.8 (since
the problem of type erasure is solved through proper generics
implementation in Java 1.8).

On Thu, Jan 14, 2016 at 1:42 PM, Michael Armbrust 
wrote:

> We automatically convert types for UDFs defined in Scala, but we can't do
> it in Java because the types are erased by the compiler.  If you want to
> use double you should cast before calling the UDF.
>
> On Wed, Jan 13, 2016 at 8:10 PM, Raghu Ganti  wrote:
>
>> So, when I try BigDecimal, it works. But, should it not parse based on
>> what the UDF defines? Am I missing something here?
>>
>> On Wed, Jan 13, 2016 at 4:57 PM, Ted Yu  wrote:
>>
>>> Please take a look
>>> at 
>>> sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
>>> which shows a UserDefinedAggregateFunction that works on DoubleType column.
>>>
>>> sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
>>> shows how it is registered.
>>>
>>> Cheers
>>>
>>> On Wed, Jan 13, 2016 at 11:58 AM, raghukiran 
>>> wrote:
>>>
 While registering and using SQL UDFs, I am running into the following
 problem:

 UDF registered:

 ctx.udf().register("Test", new UDF1() {
 /**
  *
  */
 private static final long serialVersionUID =
 -8231917155671435931L;

 public String call(Double x) throws Exception {
 return "testing";
 }
 }, DataTypes.StringType);

 Usage:
 query = "SELECT Test(82.4)";
 result = sqlCtx.sql(query).first();
 System.out.println(result.toString());

 Problem: Class Cast exception thrown
 Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot be
 cast
 to java.lang.Double

 This problem occurs with Spark v1.5.2 and 1.6.0.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: strange behavior in spark yarn-client mode

2016-01-14 Thread Marcelo Vanzin
Please reply to the list.

The web ui does not show the total size of the executor's heap. It
shows the amount of memory available for caching data, which is, give
or take, 60% of the heap by default.

On Thu, Jan 14, 2016 at 11:03 AM, Sanjeev Verma
 wrote:
> I am looking into the web ui of spark application master(tab executors).
>
> On Fri, Jan 15, 2016 at 12:08 AM, Marcelo Vanzin 
> wrote:
>>
>> On Thu, Jan 14, 2016 at 10:17 AM, Sanjeev Verma
>>  wrote:
>> > now it spawn a single executors with 1060M size, I am not able to
>> > understand
>> > why this time it executes executors with 1G+overhead not 2G what I
>> > specified.
>>
>> Where are you looking for the memory size for the container?
>>
>> --
>> Marcelo
>
>



-- 
Marcelo

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



strange behavior in spark yarn-client mode

2016-01-14 Thread Sanjeev Verma
I am seeing a strange behaviour while running spark in yarn client mode.I
am observing this on the single node yarn cluster.in spark-default I have
configured the executors memory as 2g and started the spark shell as follows

bin/spark-shell --master yarn-client

which trigger the 2 executors on the node with 1060MB of memory, I am able
to figure out that if you wont specify the num-executors it will span 2
executors on the node by defaults.


now when i try to run again it with the

bin/spark-shell --master yarn-client --num-executors 1

now it spawn a single executors with 1060M size, I am not able to
understand why this time it executes executors with 1G+overhead not 2G what
I specified.

why I am seeing this strange behavior?


DataFrameWriter on partitionBy for parquet eat all RAM

2016-01-14 Thread Arkadiusz Bicz
Hi

What is the proper configuration for saving parquet partition with
large number of repeated keys?

On bellow code I load 500 milion rows of data and partition it on
column with not so many different values.

Using spark-shell with 30g per executor and driver and 3 executor cores

sqlContext.read.load("hdfs://notpartitioneddata").write.partitionBy("columnname").parquet("partitioneddata")


Job failed because not enough memory in executor :

WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by
YARN for exceeding memory limits. 43.5 GB of 43.5 GB physical memory
used. Consider boosting spark.yarn.executor.memoryOverhead.
16/01/14 17:32:38 ERROR YarnScheduler: Lost executor 11 on
datanode2.babar.poc: Container killed by YARN for exceeding memory
limits. 43.5 GB of 43.5 GB physical memory used. Consider boosting
spark.yarn.executor.memoryOverhead.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: code hangs in local master mode

2016-01-14 Thread Kai Wei
Thanks for your reply, Ted.

Below is the stack dump for all threads:

Thread dump for executor driver

Updated at 2016/01/14 20:35:41

Collapse All
Thread 89: Executor task launch worker-0 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 90: Executor task launch worker-1 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 91: Executor task launch worker-2 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 92: Executor task launch worker-3 (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:359)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:942)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 37: BLOCK_MANAGER cleanup timer (WAITING)

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:503)
java.util.TimerThread.mainLoop(Timer.java:526)
java.util.TimerThread.run(Timer.java:505)

Thread 38: BROADCAST_VARS cleanup timer (WAITING)

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:503)
java.util.TimerThread.mainLoop(Timer.java:526)
java.util.TimerThread.run(Timer.java:505)

Thread 61: dag-scheduler-event-loop (WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
java.util.concurrent.LinkedBlockingDeque.takeFirst(LinkedBlockingDeque.java:489)
java.util.concurrent.LinkedBlockingDeque.take(LinkedBlockingDeque.java:678)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:46)

Thread 62: driver-heartbeater (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

Thread 3: Finalizer (WAITING)

java.lang.Object.wait(Native Method)
java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:135)
java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:151)
java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)

Thread 59: heartbeat-receiver-event-loop-thread (TIMED_WAITING)

sun.misc.Unsafe.park(Native Method)

答复: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-14 Thread Triones,Deng(vip.com)
Thanks for your response .
Our code as below :


public void process(){
logger.info("streaming process start !!!");

SparkConf sparkConf = createSparkConf(this.getClass().getSimpleName());

JavaStreamingContext jsc = this.createJavaStreamingContext(sparkConf);

if(this.streamingListener != null){
jsc.addStreamingListener(this.streamingListener);
}
JavaPairDStream allKafkaWindowData = 
this.sparkReceiverDStream.createReceiverDStream(jsc,this.streamingConf.getWindowDuration(),
this.streamingConf.getSlideDuration());

this.businessProcess(allKafkaWindowData);
this.sleep();
   jsc.start();
jsc.awaitTermination();


发件人: Shixiong(Ryan) Zhu [mailto:shixi...@databricks.com]
发送时间: 2016年1月15日 6:02
收件人: 邓刚[技术中心]
抄送: Yogesh Mahajan; user
主题: Re: 答复: 答复: spark streaming context trigger invoke stop why?

Could you show your codes? Did you use `StreamingContext.awaitTermination`? If 
so, it will return if any exception happens.

On Wed, Jan 13, 2016 at 11:47 PM, Triones,Deng(vip.com) 
> wrote:
What’s more, I am running a 7*24 hours job , so I won’t call System.exit() by 
myself. So I believe somewhere of the driver kill itself

发件人: 邓刚[技术中心]
发送时间: 2016年1月14日 15:45
收件人: 'Yogesh Mahajan'
抄送: user
主题: 答复: 答复: spark streaming context trigger invoke stop why?

Thanks for your response, ApplicationMaster is only for yarn mode. I am using 
standalone mode. Could you kindly please let me know where trigger the shutdown 
hook?

发件人: Yogesh Mahajan [mailto:ymaha...@snappydata.io]
发送时间: 2016年1月14日 12:42
收件人: 邓刚[技术中心]
抄送: user
主题: Re: 答复: spark streaming context trigger invoke stop why?

All the action happens in ApplicationMaster expecially in run method
Check ApplicationMaster#startUserApplication : userThread(Driver) which invokes 
ApplicationMaster#finish method. You can also try System.exit in your program

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
> wrote:
Hi Triones,

Check the org.apache.spark.util.ShutdownHookManager : It adds this ShutDownHook 
when you start a StreamingContext

Here is the code in StreamingContext.start()

shutdownHookRef = ShutdownHookManager.addShutdownHook(
  StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

Also looke at the following def in StreamingContext which actually stops the 
context from shutdown hook :
private def stopOnShutdown(): Unit = {
val stopGracefully = 
conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")
// Do not stop SparkContext, let its own shutdown hook stop it
stop(stopSparkContext = false, stopGracefully = stopGracefully)
}

Regards,
Yogesh Mahajan,
SnappyData Inc, snappydata.io

On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) 
> wrote:
More info

I am using spark version 1.5.2


发件人: Triones,Deng(vip.com) 
[mailto:triones.d...@vipshop.com]
发送时间: 2016年1月14日 11:24
收件人: user
主题: spark streaming context trigger invoke stop why?

Hi all
 As I saw the driver log, the task failed 4 times in a stage, the stage 
will be dropped when the input block was deleted before make use of. After that 
the StreamingContext invoke stop.  Does anyone know what kind of akka message 
trigger the stop or which code trigger the shutdown hook?


Thanks




Driver log:

 Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
[org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking 
stop(stopGracefully=false) from shutdown hook
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and confidential. You are hereby notified that, 
if you are not an intended recipient listed above, or an authorized employee or 
agent of an addressee of this communication responsible for delivering e-mail 
messages to an intended recipient, any dissemination, distribution or 
reproduction of this communication (including any attachments hereto) is 
strictly prohibited. If you have received this communication in error, please 
notify us immediately by a reply e-mail addressed to the sender and permanently 
delete the original e-mail communication and any attachments from all storage 
devices without making or otherwise retaining a copy.
本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
 This communication is intended only for the addressee(s) and may contain 
information that is privileged and 

livy test problem: Failed to execute goal org.scalatest:scalatest-maven-plugin:1.0:test (test) on project livy-spark_2.10: There are test failures

2016-01-14 Thread Ruslan Dautkhanov
Livy build test from master fails with below problem. Can't track it down.

YARN shows Livy Spark yarn application as running.
Although attempt to connect to application master shows connection refused:

HTTP ERROR 500
> Problem accessing /proxy/application_1448640910222_0046/. Reason:
> Connection refused
> Caused by:
> java.net.ConnectException: Connection refused
> at java.net.PlainSocketImpl.socketConnect(Native Method)
> at
> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
> at
> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
> at
> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> at java.net.Socket.connect(Socket.java:579)


Not sure if Livy server has application master UI?

CDH 5.5.1.

Below mvn test output footer:



> [INFO]
> 
> [INFO] Reactor Summary:
> [INFO]
> [INFO] livy-main .. SUCCESS [
>  1.299 s]
> [INFO] livy-api_2.10 .. SUCCESS [
>  3.622 s]
> [INFO] livy-client-common_2.10  SUCCESS [
>  0.862 s]
> [INFO] livy-client-local_2.10 . SUCCESS [
> 23.866 s]
> [INFO] livy-core_2.10 . SUCCESS [
>  0.316 s]
> [INFO] livy-repl_2.10 . SUCCESS [01:00
> min]
> [INFO] livy-yarn_2.10 . SUCCESS [
>  0.215 s]
> [INFO] livy-spark_2.10  FAILURE [
> 17.382 s]
> [INFO] livy-server_2.10 ... SKIPPED
> [INFO] livy-assembly_2.10 . SKIPPED
> [INFO] livy-client-http_2.10 .. SKIPPED
> [INFO]
> 
> [INFO] BUILD FAILURE
> [INFO]
> 
> [INFO] Total time: 01:48 min
> [INFO] Finished at: 2016-01-14T14:34:28-07:00
> [INFO] Final Memory: 27M/453M
> [INFO]
> 
> [ERROR] Failed to execute goal
> org.scalatest:scalatest-maven-plugin:1.0:test (test) on project
> livy-spark_2.10: There are test failures -> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal org.scalatest:scalatest-maven-plugin:1.0:test (test) on project
> livy-spark_2.10: There are test failures
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:212)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> at
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> at
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> at
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:128)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:307)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:193)
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:106)
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:863)
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:288)
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:199)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> Caused by: org.apache.maven.plugin.MojoFailureException: There are test
> failures
> at org.scalatest.tools.maven.TestMojo.execute(TestMojo.java:107)
> at
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:134)
> at
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:207)
> ... 20 more
> [ERROR]
> [ERROR] Re-run Maven using the -X switch to enable full debug logging.

DataFrame partitionBy to a single Parquet file (per partition)

2016-01-14 Thread Patrick McGloin
Hi,

I would like to reparation / coalesce my data so that it is saved into one
Parquet file per partition. I would also like to use the Spark SQL
partitionBy API. So I could do that like this:

df.coalesce(1).write.partitionBy("entity", "year", "month", "day",
"status").mode(SaveMode.Append).parquet(s"$location")

I've tested this and it doesn't seem to perform well. This is because there
is only one partition to work on in the dataset and all the partitioning,
compression and saving of files has to be done by one CPU core.

I could rewrite this to do the partitioning manually (using filter with the
distinct partition values for example) before calling coalesce.

But is there a better way to do this using the standard Spark SQL API?

Best regards,

Patrick


Re: NPE when using Joda DateTime

2016-01-14 Thread Durgesh Verma
Today is my day... Trying to go thru where I can pitch in. Let me know if below 
makes sense.

I looked at joda Java Api source code (1.2.9) and traced that call in NPE. It 
looks like AssembledChronology class is being used, the iYears instance 
variable is defined as transient.

DateTime.minusYears(int years) call trace:
long instant = getChronology().years().subtract(getMillis(), years);

Not sure how the suggested serializer would help if variable is transient.

Thanks,
-Durgesh

> On Jan 14, 2016, at 11:49 AM, Spencer, Alex (Santander) 
>  wrote:
> 
> I appreciate this – thank you.
>  
> I’m not an admin on the box I’m using spark-shell on – so I’m not sure I can 
> add them to that namespace. I’m hoping if I declare the 
> JodaDateTimeSerializer class in my REPL that I can still get this to work. I 
> think the INTERVAL part below may be key, I haven’t tried that yet.
>  
> Kind Regards,
> Alex.
>  
> From: Todd Nist [mailto:tsind...@gmail.com] 
> Sent: 14 January 2016 16:28
> To: Spencer, Alex (Santander)
> Cc: Sean Owen; user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
>  
> I had a similar problem a while back and leveraged these Kryo serializers, 
> https://github.com/magro/kryo-serializers.  I had to fallback to version 
> 0.28, but that was a while back.  You can add these to the 
> org.apache.spark.serializer.KryoRegistrator
> and then set your registrator in the spark config:
> 
> sparkConfig.
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
> ...
> 
> where YourKryoRegistrator is something like:
> 
> class YourKryoRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime], new JodaDateTimeSerializer)
> kryo.register(classOf[org.joda.time.Interval], new JodaIntervalSerializer)
>   }
> }
> HTH.
> -Todd
>  
> On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) 
>  wrote:
> Hi,
> 
> I tried take(1500) and test.collect and these both work on the "single" map 
> statement.
> 
> I'm very new to Kryo serialisation, I managed to find some code and I copied 
> and pasted and that's what originally made the single map statement work:
> 
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime])
>   }
> }
> 
> Is it because the groupBy sees a different class type? Maybe Array[DateTime]? 
> I don’t want to find the answer by trial and error though.
> 
> Alex
> 
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 14 January 2016 14:07
> To: Spencer, Alex (Santander)
> Cc: user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
> 
> It does look somehow like the state of the DateTime object isn't being 
> recreated properly on deserialization somehow, given where the NPE occurs 
> (look at the Joda source code). However the object is java.io.Serializable. 
> Are you sure the Kryo serialization is correct?
> 
> It doesn't quite explain why the map operation works by itself. It could be 
> the difference between executing locally (take(1) will look at 1 partition in 
> 1 task which prefers to be local) and executing remotely (groupBy is going to 
> need a shuffle).
> 
> On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) 
>  wrote:
> > Hello,
> >
> >
> >
> > I was wondering if somebody is able to help me get to the bottom of a
> > null pointer exception I’m seeing in my code. I’ve managed to narrow
> > down a problem in a larger class to my use of Joda’s DateTime
> > functions. I’ve successfully run my code in scala, but I’ve hit a few
> > problems when adapting it to run in spark.
> >
> >
> >
> > Spark version: 1.3.0
> >
> > Scala version: 2.10.4
> >
> > Java HotSpot 1.7
> >
> >
> >
> > I have a small case class called Transaction, which looks something
> > like
> > this:
> >
> >
> >
> > case class Transaction(date : org.joda.time.DateTime = new
> > org.joda.time.DateTime())
> >
> >
> >
> > I have an RDD[Transactions] trans:
> >
> > org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> > :44
> >
> >
> >
> > I am able to run this successfully:
> >
> >
> >
> > val test = trans.map(_.date.minusYears(10))
> >
> > test.take(1)
> >
> >
> >
> > However if I do:
> >
> >
> >
> > val groupedTrans = trans.groupBy(_.account)
> >
> >
> >
> > //For each group, process transactions in turn:
> >
> > val test = groupedTrans.flatMap { case (_, transList) =>
> >
> >   transList.map {transaction =>
> >
> > transaction.date.minusYears(10)
> >
> >   }
> >
> > }
> >
> > test.take(1)
> >
> >
> >
> > I get:
> >
> >
> >
> > java.lang.NullPointerException
> >
> > at org.joda.time.DateTime.minusYears(DateTime.java:1268)
> >
> >
> >
> > Should 

Can we use localIterator when we need to process data in one partition?

2016-01-14 Thread unk1102
Hi I have special requirement when I need to process data in one partition at
the last  after doing many filtering,updating etc in a DataFrame. Currently
to process data in one partition I am using coalesce(1) which is killing and
painfully slow my jobs hangs for hours even 5-6 hours and I dont know how to
solve this I came across localIterator will it be helpful in my case please
share some example if it is useful or please share me idea how to solve this
problem of processing data in one partition only. Please guide.

JavaRDD maksedRDD =
sourceRdd.coalesce(1,true).mapPartitionsWithIndex(new Function2, Iterator>() { 
@Override 
public Iterator call(Integer ind, Iterator
rowIterator) throws Exception { 
List rowList = new ArrayList<>(); 

while (rowIterator.hasNext()) { 
Row row = rowIterator.next(); 
List rowAsList =
updateRowsMethod(JavaConversions.seqAsJavaList(row.toSeq())); 
Row updatedRow = RowFactory.create(rowAsList.toArray()); 
rowList.add(updatedRow); 
}   
return rowList.iterator(); 
} 
}, false);



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-use-localIterator-when-we-need-to-process-data-in-one-partition-tp25974.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: 1.6.0: Standalone application: Getting ClassNotFoundException: org.datanucleus.api.jdo.JDOPersistenceManagerFactory

2016-01-14 Thread Egor Pahomov
My fault, I should have read documentation more accurate -
http://spark.apache.org/docs/latest/sql-programming-guide.html precisely
says, that I need to add these 3 jars to class path in case I need them. We
can not include them in fat jar, because they OSGI and require to have
plugin.xml and META_INF/MANIFEST.MF in root of jar. The problem is you have
3 of them and every one has it's own plugin.xml. You can include all this
in fat jar if you would be able to merge plugin.xml, but currently there is
no tool to do so. maven assembly plugin just has no such merger, maven
shaded plugin has XmlAppenderTransformer, but for some reason it doesn't
work. And that is it - you just have to live with the fact, that you have
fat jar with all dep, except these 3. Good news is if you are in
yarn-client mode you only need to add them to classpath of your driver, you
do not have to do addJar(). It's really good news, since it's hard to do
addJar() properly in Oozie job.

2016-01-12 17:01 GMT-08:00 Egor Pahomov :

> Hi, I'm moving my infrastructure from 1.5.2 to 1.6.0 and experiencing
> serious issue. I successfully updated spark thrift server from 1.5.2 to
> 1.6.0. But I have standalone application, which worked fine with 1.5.2 but
> failing on 1.6.0 with:
>
> *NestedThrowables:*
> *java.lang.ClassNotFoundException:
> org.datanucleus.api.jdo.JDOPersistenceManagerFactory*
> * at
> javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1175)*
> * at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)*
> * at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)*
>
> Inside this application I work with hive table, which have data in json
> format.
>
> When I add
>
> 
> org.datanucleus
> datanucleus-core
> 4.0.0-release
> 
>
> 
> org.datanucleus
> datanucleus-api-jdo
> 4.0.0-release
> 
>
> 
> org.datanucleus
> datanucleus-rdbms
> 3.2.9
> 
>
> I'm getting:
>
> *Caused by: org.datanucleus.exceptions.NucleusUserException: Persistence
> process has been specified to use a ClassLoaderResolver of name
> "datanucleus" yet this has not been found by the DataNucleus plugin
> mechanism. Please check your CLASSPATH and plugin specification.*
> * at
> org.datanucleus.AbstractNucleusContext.(AbstractNucleusContext.java:102)*
> * at
> org.datanucleus.PersistenceNucleusContextImpl.(PersistenceNucleusContextImpl.java:162)*
>
> I have CDH 5.5. I build spark with
>
> *./make-distribution.sh -Pyarn -Phadoop-2.6
> -Dhadoop.version=2.6.0-cdh5.5.0 -Phive -DskipTests*
>
> Than I publish fat jar locally:
>
> *mvn org.apache.maven.plugins:maven-install-plugin:2.3.1:install-file
> -Dfile=./spark-assembly.jar -DgroupId=org.spark-project
> -DartifactId=my-spark-assembly -Dversion=1.6.0-SNAPSHOT -Dpackaging=jar*
>
> Than I include dependency on this fat jar:
>
> 
> org.spark-project
> my-spark-assembly
> 1.6.0-SNAPSHOT
> 
>
> Than I build my application with assembly plugin:
>
> 
> org.apache.maven.plugins
> maven-shade-plugin
> 
> 
> 
> *:*
> 
> 
> 
> 
> *:*
> 
> META-INF/*.SF
> META-INF/*.DSA
> META-INF/*.RSA
> 
> 
> 
> 
> 
> 
> package
> 
> shade
> 
> 
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
>  
> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
> 
> META-INF/services/org.apache.hadoop.fs.FileSystem
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
> reference.conf
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
> log4j.properties
> 
>  
> implementation="org.apache.maven.plugins.shade.resource.ApacheLicenseResourceTransformer"/>
>  
> implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer"/>
> 
> 
> 
> 
> 
>
> Configuration of assembly plugin is copy-past from spark assembly pom.
>
> This workflow worked for 1.5.2 and broke for 1.6.0. If I have not good 
> approach of creating this standalone application, please recommend other 
> approach, but spark-submit does not work for me - it hard for me to connect 
> it to Oozie.
>
> Any suggestion would be 

Set Hadoop User in Spark Shell

2016-01-14 Thread Daniel Valdivia
Hi,

I'm trying to set the value of a hadoop parameter within spark-shell, and 
System.setProperty("HADOOP_USER_NAME", "hadoop") seem to not be doing the trick

Does anything know how I can set the hadoop.job.ugi parameter from within 
spark-shell ?

Cheers
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UDF problem (with re to types)

2016-01-14 Thread Michael Armbrust
I don't believe that Java 8 got rid of erasure. In fact I think its
actually worse when you use Java 8 lambdas.

On Thu, Jan 14, 2016 at 10:54 AM, Raghu Ganti  wrote:

> Would this go away if the Spark source was compiled against Java 1.8
> (since the problem of type erasure is solved through proper generics
> implementation in Java 1.8).
>
> On Thu, Jan 14, 2016 at 1:42 PM, Michael Armbrust 
> wrote:
>
>> We automatically convert types for UDFs defined in Scala, but we can't do
>> it in Java because the types are erased by the compiler.  If you want to
>> use double you should cast before calling the UDF.
>>
>> On Wed, Jan 13, 2016 at 8:10 PM, Raghu Ganti 
>> wrote:
>>
>>> So, when I try BigDecimal, it works. But, should it not parse based on
>>> what the UDF defines? Am I missing something here?
>>>
>>> On Wed, Jan 13, 2016 at 4:57 PM, Ted Yu  wrote:
>>>
 Please take a look
 at 
 sql/hive/src/test/java/org/apache/spark/sql/hive/aggregate/MyDoubleSum.java
 which shows a UserDefinedAggregateFunction that works on DoubleType column.

 sql/hive/src/test/java/org/apache/spark/sql/hive/JavaDataFrameSuite.java
 shows how it is registered.

 Cheers

 On Wed, Jan 13, 2016 at 11:58 AM, raghukiran 
 wrote:

> While registering and using SQL UDFs, I am running into the following
> problem:
>
> UDF registered:
>
> ctx.udf().register("Test", new UDF1() {
> /**
>  *
>  */
> private static final long serialVersionUID =
> -8231917155671435931L;
>
> public String call(Double x) throws Exception {
> return "testing";
> }
> }, DataTypes.StringType);
>
> Usage:
> query = "SELECT Test(82.4)";
> result = sqlCtx.sql(query).first();
> System.out.println(result.toString());
>
> Problem: Class Cast exception thrown
> Caused by: java.lang.ClassCastException: java.math.BigDecimal cannot
> be cast
> to java.lang.Double
>
> This problem occurs with Spark v1.5.2 and 1.6.0.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SQL-UDF-problem-with-re-to-types-tp25968.html
> Sent from the Apache Spark User List mailing list archive at
> Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>

>>>
>>
>


Re: 101 question on external metastore

2016-01-14 Thread Yana Kadiyska
If you have a second could you post the version of derby that you
installed, the contents of  hive-site.xml and the command you use to run
(along with spark version?). I'd like to retry the installation.

On Thu, Jan 7, 2016 at 7:35 AM, Deenar Toraskar 
wrote:

> I sorted this out. There were 2 different version of derby and ensuring
> the metastore and spark used the same version of Derby made the problem go
> away.
>
> Deenar
>
> On 6 January 2016 at 02:55, Yana Kadiyska  wrote:
>
>> Deenar, I have not resolved this issue. Why do you think it's from
>> different versions of Derby? I was playing with this as a fun experiment
>> and my setup was on a clean machine -- no other versions of
>> hive/hadoop/etc...
>>
>> On Sun, Dec 20, 2015 at 12:17 AM, Deenar Toraskar <
>> deenar.toras...@gmail.com> wrote:
>>
>>> apparently it is down to different versions of derby in the classpath,
>>> but i am unsure where the other version is coming from. The setup worked
>>> perfectly with spark 1.3.1.
>>>
>>> Deenar
>>>
>>> On 20 December 2015 at 04:41, Deenar Toraskar >> > wrote:
>>>
 Hi Yana/All

 I am getting the same exception. Did you make any progress?

 Deenar

 On 5 November 2015 at 17:32, Yana Kadiyska 
 wrote:

> Hi folks, trying experiment with a minimal external metastore.
>
> I am following the instructions here:
> https://cwiki.apache.org/confluence/display/Hive/HiveDerbyServerMode
>
> I grabbed Derby 10.12.1.1 and started an instance, verified I can
> connect via ij tool and that process is listening on 1527
>
> put the following hive-site.xml under conf
> ```
> 
> 
> 
> 
>   javax.jdo.option.ConnectionURL
>   jdbc:derby://localhost:1527/metastore_db;create=true
>   JDBC connect string for a JDBC metastore
> 
> 
>   javax.jdo.option.ConnectionDriverName
>   org.apache.derby.jdbc.ClientDriver
>   Driver class name for a JDBC metastore
> 
> 
> ```
>
> I then try to run spark-shell thusly:
> bin/spark-shell --driver-class-path
> /home/yana/db-derby-10.12.1.1-bin/lib/derbyclient.jar
>
> and I get an ugly stack trace like so...
>
> Caused by: java.lang.NoClassDefFoundError: Could not initialize class
> org.apache.derby.jdbc.EmbeddedDriver
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
> Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at java.lang.Class.newInstance(Class.java:379)
> at
> org.datanucleus.store.rdbms.connectionpool.AbstractConnectionPoolFactory.loadDriver(AbstractConnectionPoolFactory.java:47)
> at
> org.datanucleus.store.rdbms.connectionpool.DBCPConnectionPoolFactory.createConnectionPool(DBCPConnectionPoolFactory.java:50)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.generateDataSources(ConnectionFactoryImpl.java:238)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.initialiseDataSources(ConnectionFactoryImpl.java:131)
> at
> org.datanucleus.store.rdbms.ConnectionFactoryImpl.(ConnectionFactoryImpl.java:85)
> ... 114 more
>
> :10: error: not found: value sqlContext
>import sqlContext.implicits._
>
>
> What am I doing wrong -- not sure why it's looking for Embedded
> anything, I'm specifically trying to not use the embedded server...but I
> know my hive-site is being read as starting witout --driver-class-path 
> does
> say it can't load org.apache.derby.jdbc.ClientDriver
>


>>>
>>
>


Re: Usage of SparkContext within a Web container

2016-01-14 Thread Eugene Morozov
Praveen,

Zeppelin uses Spark's REPL.

I'm currently writing an app that is a web service, which is going to run
spark jobs.
So, at the init stage I just create JavaSparkContext and then use it for
all users requests. Web service is stateless. The issue with stateless is
that it's possible to run several instances of web service, but each of
them will have separate JavaSparkContext, which means they are going to
compete for resources as different application. Although they have to look
as just one application. I'm pretty sure it's possible to use pools, but I
haven't tried it, yet. I see no other cons... or pros for that matter.

The way you're going to use it, I'd say, depends on if users are going to
provide their own code. If that's the case, then you probably better with
Zeppelin's way. If not - then my assumption is that using SparkContext for
processing is simpler.

--
Be well!
Jean Morozov

On Thu, Jan 14, 2016 at 10:44 AM, praveen S  wrote:

> Is use of SparkContext from a Web container a right way to process spark
> jobs or should we use spark-submit in a processbuilder?
>
> Are there any pros or cons of using SparkContext from a Web container..?
>
> How does zeppelin trigger spark jobs from the Web context?
>


Re: code hangs in local master mode

2016-01-14 Thread Ted Yu
Can you capture one or two stack traces of the local master process and
pastebin them ?

Thanks

On Thu, Jan 14, 2016 at 6:01 AM, Kai Wei  wrote:

> Hi list,
>
> I ran into an issue which I think could be a bug.
>
> I have a Hive table stored as parquet files. Let's say it's called
> testtable. I found the code below stuck forever in spark-shell with a local
> master or driver/executor:
> sqlContext.sql("select * from testtable").rdd.cache.zipWithIndex().count
>
> But it works if I use a standalone master.
>
> I also tried several different variants:
> don't cache the rdd(works):
> sqlContext.sql("select * from testtable").rdd.zipWithIndex().count
>
> cache the rdd after zipWithIndex(works):
> sqlContext.sql("select * from testtable").rdd.zipWithIndex().cache.count
>
> use parquet file reader(doesn't work):
>
> sqlContext.read.parquet("hdfs://localhost:8020/user/hive/warehouse/testtable").rdd.cache.zipWithIndex().count
>
> use parquet files on local file system(works)
> sqlContext.read.parquet("/tmp/testtable").rdd.cache.zipWithIndex().count
>
> I read the code of zipWithIndex() and the DAG visualization. I think the
> function cause the Spark firstly retrieve n-1 partitions of target table
> and cache them, then the last partition. It must be something wrong when
> the driver/executor tries to read the last parition from HDFS .
>
> I am using spark-1.5.2-bin-hadoop-2.6 on cloudera quickstart vm 5.4.2.
>
> --
> Kai Wei
> Big Data Developer
>
> Pythian - love your data
>
> w...@pythian.com
> Tel: +1 613 565 8696 x1579
> Mobile: +61 403 572 456
>
> --
>
>
>
>


Re: NPE when using Joda DateTime

2016-01-14 Thread Todd Nist
I had a similar problem a while back and leveraged these Kryo serializers,
https://github.com/magro/kryo-serializers.  I had to fallback to version
0.28, but that was a while back.  You can add these to the

org.apache.spark.serializer.KryoRegistrator

and then set your registrator in the spark config:

sparkConfig.
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
...

where YourKryoRegistrator is something like:

class YourKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {
kryo.register(classOf[org.joda.time.DateTime], new
JodaDateTimeSerializer)
kryo.register(classOf[org.joda.time.Interval], new
JodaIntervalSerializer)
  }
}

HTH.

-Todd

On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) <
alex.spen...@santander.co.uk.invalid> wrote:

> Hi,
>
> I tried take(1500) and test.collect and these both work on the "single"
> map statement.
>
> I'm very new to Kryo serialisation, I managed to find some code and I
> copied and pasted and that's what originally made the single map statement
> work:
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime])
>   }
> }
>
> Is it because the groupBy sees a different class type? Maybe
> Array[DateTime]? I don’t want to find the answer by trial and error though.
>
> Alex
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 14 January 2016 14:07
> To: Spencer, Alex (Santander)
> Cc: user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
>
> It does look somehow like the state of the DateTime object isn't being
> recreated properly on deserialization somehow, given where the NPE occurs
> (look at the Joda source code). However the object is java.io.Serializable.
> Are you sure the Kryo serialization is correct?
>
> It doesn't quite explain why the map operation works by itself. It could
> be the difference between executing locally (take(1) will look at 1
> partition in 1 task which prefers to be local) and executing remotely
> (groupBy is going to need a shuffle).
>
> On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander)
>  wrote:
> > Hello,
> >
> >
> >
> > I was wondering if somebody is able to help me get to the bottom of a
> > null pointer exception I’m seeing in my code. I’ve managed to narrow
> > down a problem in a larger class to my use of Joda’s DateTime
> > functions. I’ve successfully run my code in scala, but I’ve hit a few
> > problems when adapting it to run in spark.
> >
> >
> >
> > Spark version: 1.3.0
> >
> > Scala version: 2.10.4
> >
> > Java HotSpot 1.7
> >
> >
> >
> > I have a small case class called Transaction, which looks something
> > like
> > this:
> >
> >
> >
> > case class Transaction(date : org.joda.time.DateTime = new
> > org.joda.time.DateTime())
> >
> >
> >
> > I have an RDD[Transactions] trans:
> >
> > org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> > :44
> >
> >
> >
> > I am able to run this successfully:
> >
> >
> >
> > val test = trans.map(_.date.minusYears(10))
> >
> > test.take(1)
> >
> >
> >
> > However if I do:
> >
> >
> >
> > val groupedTrans = trans.groupBy(_.account)
> >
> >
> >
> > //For each group, process transactions in turn:
> >
> > val test = groupedTrans.flatMap { case (_, transList) =>
> >
> >   transList.map {transaction =>
> >
> > transaction.date.minusYears(10)
> >
> >   }
> >
> > }
> >
> > test.take(1)
> >
> >
> >
> > I get:
> >
> >
> >
> > java.lang.NullPointerException
> >
> > at org.joda.time.DateTime.minusYears(DateTime.java:1268)
> >
> >
> >
> > Should the second operation not be equivalent to the first .map one?
> > (It’s a long way round of producing my error – but it’s extremely
> > similar to what’s happening in my class).
> >
> >
> >
> > I’ve got a custom registration class for Kryo which I think is working
> > - before I added this the original .map did not work – but shouldn’t
> > it be able to serialize all instances of Joda DateTime?
> >
> >
> >
> > Thank you for any help / pointers you can give me.
> >
> >
> >
> > Kind Regards,
> >
> > Alex.
> >
> >
> >
> > Alex Spencer
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional
> commands, e-mail: user-h...@spark.apache.org
>
> Emails aren't always secure, and they may be intercepted or changed after
> they've been sent. Santander doesn't accept liability if this happens. If
> you
> think someone may have interfered with this email, please get in touch
> with the
> sender another way. This message doesn't create or change any contract.
> Santander doesn't accept responsibility for damage caused by any viruses
> contained in this email or its attachments. Emails may be monitored. If
> you've
> 

Spark 1.5.2 streaming driver in YARN cluster mode on Hadoop 2.6 (on EMR 4.2) restarts after stop

2016-01-14 Thread Roberto Coluccio
Hi there,

I'm facing a weird issue when upgrading from Spark 1.4.1 streaming driver
on EMR 3.9 (hence Hadoop 2.4.0) to Spark 1.5.2 on EMR 4.2 (hence Hadoop
2.6.0).

Basically, the very same driver which used to terminate after a timeout as
expected, now does not. In particular, as long as the driver's logs could
tell me, the StreamingContext seems to be stopped with success (and exit
code 0), but the Hadoop/YARN job does not terminate/complete. Instead,
after a couple of minutes hanging, the driver just seems to start its
processing again! Here follows a logs stack example collected during stop.

16/01/12 19:17:32 INFO ApplicationMaster: Final app status: SUCCEEDED,
> exitCode: 0
> 16/01/12 19:17:32 INFO StreamingContext: Invoking
> stop(stopGracefully=true) from shutdown hook
> 16/01/12 19:17:32 INFO ReceiverTracker: Sent stop signal to all 3 receivers
> 16/01/12 19:17:32 ERROR ReceiverTracker: Deregistered receiver for stream
> 1: Stopped by driver
> 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream
> 2: Stopped by driver
> 16/01/12 19:17:33 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID
> 97) in 1200804 ms on ip-172-31-9-4.ec2.internal (1/1)
> 16/01/12 19:17:33 INFO DAGScheduler: ResultStage 8 (start at
> NetflowStreamingApp.scala:68) finished in 1200.806 s
> 16/01/12 19:17:33 INFO YarnClusterScheduler: Removed TaskSet 8.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:33 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Stopped by driver
> 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 12.0
> (TID 101) in 1199753 ms on ip-172-31-9-4.ec2.internal (1/1)
> 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 12.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 12 (start at
> NetflowStreamingApp.scala:68) finished in 1199.753 s
> 16/01/12 19:17:34 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID
> 96) in 1201854 ms on ip-172-31-9-5.ec2.internal (1/1)
> 16/01/12 19:17:34 INFO DAGScheduler: ResultStage 7 (start at
> NetflowStreamingApp.scala:68) finished in 1201.855 s
> 16/01/12 19:17:34 INFO YarnClusterScheduler: Removed TaskSet 7.0, whose
> tasks have all completed, from pool
> 16/01/12 19:17:34 INFO ReceiverTracker: Waiting for receiver job to
> terminate gracefully
> 16/01/12 19:17:34 INFO ReceiverTracker: Waited for receiver job to
> terminate gracefully
> 16/01/12 19:17:34 INFO ReceiverTracker: All of the receivers have
> deregistered successfully
> 16/01/12 19:17:34 INFO WriteAheadLogManager : Stopped write ahead log
> manager
> 16/01/12 19:17:34 INFO ReceiverTracker: ReceiverTracker stopped
> 16/01/12 19:17:34 INFO JobGenerator: Stopping JobGenerator gracefully
> 16/01/12 19:17:34 INFO JobGenerator: Waiting for all received blocks to be
> consumed for job generation


The "receivers" mentioned in the logs are the Kinesis streams receivers.

In my Scala 2.10 based driver, I just use
StreamingContext.awaitTerminationOrTimeout(timeout) API  (called right
after StreamingContext.start()) and set the
SparkConf spark.streaming.stopGracefullyOnShutdown=true.

Did anybody experience anything similar?

Any help would be appreciated.

Thanks,

Roberto


Re: NPE when using Joda DateTime

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you try to use "Kryo.setDefaultSerializer" like this:

class YourKryoRegistrator extends KryoRegistrator {

  override def registerClasses(kryo: Kryo) {

kryo.setDefaultSerializer(classOf[com.esotericsoftware.kryo.serializers.JavaSerializer])
  }
}


On Thu, Jan 14, 2016 at 12:54 PM, Durgesh Verma  wrote:

> Today is my day... Trying to go thru where I can pitch in. Let me know if
> below makes sense.
>
> I looked at joda Java Api source code (1.2.9) and traced that call in NPE.
> It looks like AssembledChronology class is being used, the iYears instance
> variable is defined as transient.
>
> DateTime.minusYears(int years) call trace:
> long instant = getChronology().years().subtract(getMillis(), years);
>
> Not sure how the suggested serializer would help if variable is transient.
>
> Thanks,
> -Durgesh
>
> On Jan 14, 2016, at 11:49 AM, Spencer, Alex (Santander) <
> alex.spen...@santander.co.uk.INVALID
> > wrote:
>
> I appreciate this – thank you.
>
>
>
> I’m not an admin on the box I’m using spark-shell on – so I’m not sure I
> can add them to that namespace. I’m hoping if I declare the
> JodaDateTimeSerializer class in my REPL that I can still get this to work.
> I think the INTERVAL part below may be key, I haven’t tried that yet.
>
>
>
> Kind Regards,
>
> Alex.
>
>
>
> *From:* Todd Nist [mailto:tsind...@gmail.com ]
> *Sent:* 14 January 2016 16:28
> *To:* Spencer, Alex (Santander)
> *Cc:* Sean Owen; user@spark.apache.org
> *Subject:* Re: NPE when using Joda DateTime
>
>
>
> I had a similar problem a while back and leveraged these Kryo serializers,
> https://github.com/magro/kryo-serializers.  I had to fallback to version
> 0.28, but that was a while back.  You can add these to the
>
> org.apache.spark.serializer.KryoRegistrator
>
> and then set your registrator in the spark config:
>
> sparkConfig.
> .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
> .set("spark.kryo.registrator", "com.yourpackage.YourKryoRegistrator")
> ...
>
> where YourKryoRegistrator is something like:
>
> class YourKryoRegistrator extends KryoRegistrator {
>
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime], new
> JodaDateTimeSerializer)
> kryo.register(classOf[org.joda.time.Interval], new
> JodaIntervalSerializer)
>   }
> }
>
> HTH.
>
> -Todd
>
>
>
> On Thu, Jan 14, 2016 at 9:28 AM, Spencer, Alex (Santander) <
> alex.spen...@santander.co.uk.invalid> wrote:
>
> Hi,
>
> I tried take(1500) and test.collect and these both work on the "single"
> map statement.
>
> I'm very new to Kryo serialisation, I managed to find some code and I
> copied and pasted and that's what originally made the single map statement
> work:
>
> class MyRegistrator extends KryoRegistrator {
>   override def registerClasses(kryo: Kryo) {
> kryo.register(classOf[org.joda.time.DateTime])
>   }
> }
>
> Is it because the groupBy sees a different class type? Maybe
> Array[DateTime]? I don’t want to find the answer by trial and error though.
>
> Alex
>
> -Original Message-
> From: Sean Owen [mailto:so...@cloudera.com]
> Sent: 14 January 2016 14:07
> To: Spencer, Alex (Santander)
> Cc: user@spark.apache.org
> Subject: Re: NPE when using Joda DateTime
>
> It does look somehow like the state of the DateTime object isn't being
> recreated properly on deserialization somehow, given where the NPE occurs
> (look at the Joda source code). However the object is java.io.Serializable.
> Are you sure the Kryo serialization is correct?
>
> It doesn't quite explain why the map operation works by itself. It could
> be the difference between executing locally (take(1) will look at 1
> partition in 1 task which prefers to be local) and executing remotely
> (groupBy is going to need a shuffle).
>
> On Thu, Jan 14, 2016 at 1:01 PM, Spencer, Alex (Santander) <
> alex.spen...@santander.co.uk.invalid> wrote:
> > Hello,
> >
> >
> >
> > I was wondering if somebody is able to help me get to the bottom of a
> > null pointer exception I’m seeing in my code. I’ve managed to narrow
> > down a problem in a larger class to my use of Joda’s DateTime
> > functions. I’ve successfully run my code in scala, but I’ve hit a few
> > problems when adapting it to run in spark.
> >
> >
> >
> > Spark version: 1.3.0
> >
> > Scala version: 2.10.4
> >
> > Java HotSpot 1.7
> >
> >
> >
> > I have a small case class called Transaction, which looks something
> > like
> > this:
> >
> >
> >
> > case class Transaction(date : org.joda.time.DateTime = new
> > org.joda.time.DateTime())
> >
> >
> >
> > I have an RDD[Transactions] trans:
> >
> > org.apache.spark.rdd.RDD[Transaction] = MapPartitionsRDD[4] at map at
> > :44
> >
> >
> >
> > I am able to run this successfully:
> >
> >
> >
> > val test = trans.map(_.date.minusYears(10))
> >
> > test.take(1)
> >
> >
> >
> > However if I do:
> >
> >
> >
> > val groupedTrans = 

Re: 答复: 答复: spark streaming context trigger invoke stop why?

2016-01-14 Thread Shixiong(Ryan) Zhu
Could you show your codes? Did you use `StreamingContext.awaitTermination`?
If so, it will return if any exception happens.

On Wed, Jan 13, 2016 at 11:47 PM, Triones,Deng(vip.com) <
triones.d...@vipshop.com> wrote:

> What’s more, I am running a 7*24 hours job , so I won’t call System.exit()
> by myself. So I believe somewhere of the driver kill itself
>
>
>
> *发件人:* 邓刚[技术中心]
> *发送时间:* 2016年1月14日 15:45
> *收件人:* 'Yogesh Mahajan'
> *抄送:* user
> *主题:* 答复: 答复: spark streaming context trigger invoke stop why?
>
>
>
> Thanks for your response, ApplicationMaster is only for yarn mode. I am
> using standalone mode. Could you kindly please let me know where trigger
> the shutdown hook?
>
>
>
> *发件人:* Yogesh Mahajan [mailto:ymaha...@snappydata.io
> ]
> *发送时间:* 2016年1月14日 12:42
> *收件人:* 邓刚[技术中心]
> *抄送:* user
> *主题:* Re: 答复: spark streaming context trigger invoke stop why?
>
>
>
> All the action happens in ApplicationMaster expecially in run method
>
> Check ApplicationMaster#startUserApplication : userThread(Driver) which
> invokes ApplicationMaster#finish method. You can also try System.exit in
> your program
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 9:56 AM, Yogesh Mahajan 
> wrote:
>
> Hi Triones,
>
>
>
> Check the org.apache.spark.util.ShutdownHookManager : It adds this
> ShutDownHook when you start a StreamingContext
>
>
>
> Here is the code in StreamingContext.start()
>
>
>
> shutdownHookRef = ShutdownHookManager.addShutdownHook(
>
>   StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
>
>
>
> Also looke at the following def in StreamingContext which actually stops
> the context from shutdown hook :
>
> private def stopOnShutdown(): Unit = {
>
> val stopGracefully =
> conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)
>
> logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown
> hook")
>
> // Do not stop SparkContext, let its own shutdown hook stop it
>
> stop(stopSparkContext = false, stopGracefully = stopGracefully)
>
> }
>
>
>
> Regards,
>
> Yogesh Mahajan,
>
> SnappyData Inc, snappydata.io
>
>
>
> On Thu, Jan 14, 2016 at 8:55 AM, Triones,Deng(vip.com) <
> triones.d...@vipshop.com> wrote:
>
> More info
>
>
>
> I am using spark version 1.5.2
>
>
>
>
>
> *发件人:* Triones,Deng(vip.com) [mailto:triones.d...@vipshop.com]
> *发送时间:* 2016年1月14日 11:24
> *收件人:* user
> *主题:* spark streaming context trigger invoke stop why?
>
>
>
> Hi all
>
>  As I saw the driver log, the task failed 4 times in a stage, the
> stage will be dropped when the input block was deleted before make use of.
> After that the StreamingContext invoke stop.  Does anyone know what kind of
> akka message trigger the stop or which code trigger the shutdown hook?
>
>
>
>
>
> Thanks
>
>
>
>
>
>
>
>
>
> Driver log:
>
>
>
>  Job aborted due to stage failure: Task 410 in stage 215.0 failed 4 times
>
> [org.apache.spark.streaming.StreamingContext---Thread-0]: Invoking
> stop(stopGracefully=false) from shutdown hook
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and may contain
> information that is privileged and confidential. You are hereby notified
> that, if you are not an intended recipient listed above, or an authorized
> employee or agent of an addressee of this communication responsible for
> delivering e-mail messages to an intended recipient, any dissemination,
> distribution or reproduction of this communication (including any
> attachments hereto) is strictly prohibited. If you have received this
> communication in error, please notify us immediately by a reply e-mail
> addressed to the sender and permanently delete the original e-mail
> communication and any attachments from all storage devices without making
> or otherwise retaining a copy.
>
>
>
>
> 本电子邮件可能为保密文件。如果阁下非电子邮件所指定之收件人,谨请立即通知本人。敬请阁下不要使用、保存、复印、打印、散布本电子邮件及其内容,或将其用于其他任何目的或向任何人披露。谢谢您的合作!
> This communication is intended only for the addressee(s) and