Re: Meets "java.lang.IllegalArgumentException" when test spark ml pipe with DecisionTreeClassifier

2015-09-09 Thread Terry Hole
Sean,

Thank you!

Finally, I get this to work, although it is a bit ugly: manually to set the
meta data of dataframe.

import org.apache.spark.ml.attribute._
import org.apache.spark.sql.types._
val df = training.toDF()
val schema = df.schema
val rowRDD = df.rdd
def enrich(m : Metadata) : Metadata = {
  val na = NominalAttribute.defaultAttr.withValues("0", "1")
  na.toMetadata(m)
}
val newSchema = StructType(schema.map(f => if (f.name == "label")
f.copy(metadata=enrich(f.metadata)) else f))
val model = pipeline.fit(sqlContext.createDataFrame(rowRDD, newSchema))

Thanks!
- Terry

On Mon, Sep 7, 2015 at 4:24 PM, Sean Owen  wrote:

> Hm, off the top of my head I don't know. I haven't looked at this
> aspect in a while, strangely. It's an attribute in the metadata of the
> field. I assume there's a method for setting this metadata when you
> construct the input data.
>
> On Sun, Sep 6, 2015 at 10:41 AM, Terry Hole  wrote:
> > Sean
> >
> > Do you know how to tell decision tree that the "label" is a binary or set
> > some attributes to dataframe to carry number of classes?
> >
> > Thanks!
> > - Terry
> >
> > On Sun, Sep 6, 2015 at 5:23 PM, Sean Owen  wrote:
> >>
> >> (Sean)
> >> The error suggests that the type is not a binary or nominal attribute
> >> though. I think that's the missing step. A double-valued column need
> >> not be one of these attribute types.
> >>
> >> On Sun, Sep 6, 2015 at 10:14 AM, Terry Hole 
> wrote:
> >> > Hi, Owen,
> >> >
> >> > The dataframe "training" is from a RDD of case class:
> >> > RDD[LabeledDocument],
> >> > while the case class is defined as this:
> >> > case class LabeledDocument(id: Long, text: String, label: Double)
> >> >
> >> > So there is already has the default "label" column with "double" type.
> >> >
> >> > I already tried to set the label column for decision tree as this:
> >> > val lr = new
> >> >
> >> >
> DecisionTreeClassifier().setMaxDepth(5).setMaxBins(32).setImpurity("gini").setLabelCol("label")
> >> > It raised the same error.
> >> >
> >> > I also tried to change the "label" to "int" type, it also reported
> error
> >> > like following stack, I have no idea how to make this work.
> >> >
> >> > java.lang.IllegalArgumentException: requirement failed: Column label
> >> > must be
> >> > of type DoubleType but was actually IntegerType.
> >> > at scala.Predef$.require(Predef.scala:233)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:37)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:53)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
> >> > at
> >> > org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> > at
> >> >
> >> >
> org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:162)
> >> > at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
> >> > at
> >> >
> >> >
> scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
> >> > at
> >> > scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
> >> > at
> >> > org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:162)
> >> > at
> >> > org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:59)
> >> > at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:116)
> >> > at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:51)
> >> > at
> >> >
> >> >
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:56)
> >> > at
> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:58)
> >> > at
> >> > $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:60)
> >> > at
> $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:62)
> >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:64)
> >> > at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:66)
> >> > at $iwC$$iwC$$iwC$$iwC$$iwC.(:68)
> >> > at $iwC$$iwC$$iwC$$iwC.(:70)
> >> > at $iwC$$iwC$$iwC.(:72)
> >> > at $iwC$$iwC.(:74)
> >> > at $iwC.(:76)
> >> > at (:78)
> >> > at .(:82)
> >> > at .()
> >> > at .(:7)
> >> > at .()
> >> > at $print()
> >> >
> >> > Thanks!
> >> > - Terry
> >> >
> >> > On Sun, Sep 6, 2015 at 4:53 PM, Sean Owen  wrote:
> >> >>
> >> >> I think somewhere alone the line you've not specified your label
> >> >> column -- it's defaulting to "label" and it does not recognize it, or
> >> >> at least not as a binary or nominal attribute.
> >> >>
> >> >> On Sun, Sep 6, 2015 at 5:47 AM, Terry Hole 
> 

Re: Contribution in Apche Spark

2015-09-09 Thread Akhil Das
Have a look
https://issues.apache.org/jira/browse/spark/?selectedTab=com.atlassian.jira.jira-projects-plugin:summary-panel

Thanks
Best Regards

On Wed, Sep 9, 2015 at 9:50 AM, Chintan Bhatt <
chintanbhatt...@charusat.ac.in> wrote:

> I want to contribute in Apache dspark especially in MLlib in Spark.
> pls suggest me any open issue/use case/problem.
>
> --
> CHINTAN BHATT 
> Assistant Professor,
> U & P U Patel Department of Computer Engineering,
> Chandubhai S. Patel Institute of Technology,
> Charotar University of Science And Technology (CHARUSAT),
> Changa-388421, Gujarat, INDIA.
> http://www.charusat.ac.in
> *Personal Website*: https://sites.google.com/a/ecchanga.ac.in/chintan/
>
> [image: IBM]
> 
>
>
> DISCLAIMER: The information transmitted is intended only for the person or
> entity to which it is addressed and may contain confidential and/or
> privileged material which is the intellectual property of Charotar
> University of Science & Technology (CHARUSAT). Any review, retransmission,
> dissemination or other use of, or taking of any action in reliance upon
> this information by persons or entities other than the intended recipient
> is strictly prohibited. If you are not the intended recipient, or the
> employee, or agent responsible for delivering the message to the intended
> recipient and/or if you have received this in error, please contact the
> sender and delete the material from the computer or device. CHARUSAT does
> not take any liability or responsibility for any malicious codes/software
> and/or viruses/Trojan horses that may have been picked up during the
> transmission of this message. By opening and solely relying on the contents
> or part thereof this message, and taking action thereof, the recipient
> relieves the CHARUSAT of all the liabilities including any damages done to
> the recipient's pc/laptop/peripherals and other communication devices due
> to any reason.
>


Re: hadoop2.6.0 + spark1.4.1 + python2.7.10

2015-09-09 Thread Ashish Dutt
Dear Sasha,

What I did was that I installed the parcels on all the nodes of the
cluster. Typically the location was
/opt/cloudera/parcels/CDH5.4.2-1.cdh5.4.2.p0.2
Hope this helps you.

With regards,
Ashish



On Tue, Sep 8, 2015 at 10:18 PM, Sasha Kacanski  wrote:

> Hi Ashish,
> Thanks for the update.
> I tried all of it, but what I don't get it is that I run cluster with one
> node so presumably I should have PYspark binaries there as I am developing
> on same host.
> Could you tell me where you placed parcels or whatever cloudera is using.
> My understanding of yarn and spark is that these binaries get compressed
> and packaged with Java to be pushed to work node.
> Regards,
> On Sep 7, 2015 9:00 PM, "Ashish Dutt"  wrote:
>
>> Hello Sasha,
>>
>> I have no answer for debian. My cluster is on Linux and I'm using CDH 5.4
>> Your question-  "Error from python worker:
>>   /cube/PY/Python27/bin/python: No module named pyspark"
>>
>> On a single node (ie one server/machine/computer) I installed pyspark
>> binaries and it worked. Connected it to pycharm and it worked too.
>>
>> Next I tried executing pyspark command on another node (say the worker)
>> in the cluster and i got this error message, Error from python worker:
>> PATH: No module named pyspark".
>>
>> My first guess was that the worker is not picking up the path of pyspark
>> binaries installed on the server ( I tried many a things like hard-coding
>> the pyspark path in the config.sh file on the worker- NO LUCK; tried
>> dynamic path from the code in pycharm- NO LUCK... ; searched the web and
>> asked the question in almost every online forum--NO LUCK..; banged my head
>> several times with pyspark/hadoop books--NO LUCK... Finally, one fine day a
>> 'watermelon' dropped while brooding on this problem and I installed pyspark
>> binaries on all the worker machines ) Now when I tried executing just the
>> command pyspark on the worker's it worked. Tried some simple program
>> snippets on each worker, it works too.
>>
>> I am not sure if this will help or not for your use-case.
>>
>>
>>
>> Sincerely,
>> Ashish
>>
>> On Mon, Sep 7, 2015 at 11:04 PM, Sasha Kacanski 
>> wrote:
>>
>>> Thanks Ashish,
>>> nice blog but does not cover my issue. Actually I have pycharm running
>>> and loading pyspark and rest of libraries perfectly fine.
>>> My issue is that I am not sure what is triggering
>>>
>>> Error from python worker:
>>>   /cube/PY/Python27/bin/python: No module named pyspark
>>> pyspark
>>> PYTHONPATH was:
>>>
>>> /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/filecache/18/spark-assembly-1.
>>> 4.1-hadoop2.6.0.jar
>>>
>>> Question is why is yarn not getting python package to run on the single
>>> node via YARN?
>>> Some people are saying run with JAVA 6 due to zip library changes
>>> between 6/7/8, some identified bug w RH, i am on debian,  then some
>>> documentation errors but nothing is really clear.
>>>
>>> i have binaries for spark hadoop and i did just fine with spark sql
>>> module, hive, python, pandas ad yarn.
>>> Locally as i said app is working fine (pandas to spark df to parquet)
>>> But as soon as I move to yarn client mode yarn is not getting packages
>>> required to run app.
>>>
>>> If someone confirms that I need to build everything from source with
>>> specific version of software I will do that, but at this point I am not
>>> sure what to do to remedy this situation...
>>>
>>> --sasha
>>>
>>>
>>> On Sun, Sep 6, 2015 at 8:27 PM, Ashish Dutt 
>>> wrote:
>>>
 Hi Aleksandar,
 Quite some time ago, I faced the same problem and I found a solution
 which I have posted here on my blog
 .
 See if that can help you and if it does not then you can check out
 these questions & solution on stackoverflow
  website


 Sincerely,
 Ashish Dutt


 On Mon, Sep 7, 2015 at 7:17 AM, Sasha Kacanski 
 wrote:

> Hi,
> I am successfully running python app via pyCharm in local mode
> setMaster("local[*]")
>
> When I turn on SparkConf().setMaster("yarn-client")
>
> and run via
>
> park-submit PysparkPandas.py
>
>
> I run into issue:
> Error from python worker:
>   /cube/PY/Python27/bin/python: No module named pyspark
> PYTHONPATH was:
>
> /tmp/hadoop-hadoop/nm-local-dir/usercache/hadoop/filecache/18/spark-assembly-1.4.1-hadoop2.6.0.jar
>
> I am running java
> hadoop@pluto:~/pySpark$ /opt/java/jdk/bin/java -version
> java version "1.8.0_31"
> Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
> Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
>
> Should I try same thing with java 6/7
>
> Is this packaging issue or I have something wrong with configurations

Task serialization error for mllib.MovieLensALS

2015-09-09 Thread Jeff Zhang
I run the MovieLensALS, but meet the following error. The weird thing is
that this issue only appear under openjdk. And this is based on the 1.5, I
found several related tickets, not sure has anyone else meet the same issue
and know the solution ? Thanks

https://issues.apache.org/jira/browse/SPARK-4672
https://issues.apache.org/jira/browse/SPARK-4838



Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task serialization failed: java.lang.StackOverflowError
java.io.ObjectOutputStream$BlockDataOutputStream.write(ObjectOutputStream.java:1841)
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1534)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
scala.collection.immutable.$colon$colon.writeObject(List.scala:379)

-- 
Best Regards

Jeff Zhang


Re: foreachRDD causing executor lost failure

2015-09-09 Thread Akhil Das
If you can look a bit in the executor logs, you would see the exact reason
(mostly a OOM/GC etc). Instead of using foreach, try to use mapPartitions
or foreachPartitions.

Thanks
Best Regards

On Tue, Sep 8, 2015 at 10:45 PM, Priya Ch 
wrote:

> Hello All,
>
>  I am using foreachRDD in my code as -
>
>   dstream.foreachRDD { rdd => rdd.foreach { record => // look up with
> cassandra table
> // save updated rows to cassandra table.
> }
> }
>  This foreachRDD is causing executor lost failure. what is the behavior of
> this foreachRDD ???
>
> Thanks,
> Padma Ch
>


Re: java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Aniket Bhatnagar
Hi Tom

There has to be a difference in classpaths in yarn-client and yarn-cluster
mode. Perhaps a good starting point would be to print classpath as a first
thing in SimpleApp.main. It should give clues around why it works in
yarn-cluster mode.

Thanks,
Aniket

On Wed, Sep 9, 2015, 2:11 PM Tom Seddon  wrote:

> Hi,
>
> I have a problem trying to get a fairly simple app working which makes use
> of native avro libraries.  The app runs fine on my local machine and in
> yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get
> the error below.  I'm aware this is a version problem, as EMR runs an
> earlier version of avro, and I am trying to use avro-1.7.7.
>
> What's confusing me a great deal is the fact that this runs fine in
> yarn-cluster mode.
>
> What is it about yarn-cluster mode that means the application has access
> to the correct version of the avro library?  I need to run in yarn-client
> mode as I will be caching data to the driver machine in between batches.  I
> think in yarn-cluster mode the driver can run on any machine in the cluster
> so this would not work.
>
> Grateful for any advice as I'm really stuck on this.  AWS support are
> trying but they don't seem to know why this is happening either!
>
> Just to note, I'm aware of Databricks spark-avro project and have used
> it.  This is an investigation to see if I can use RDDs instead of
> dataframes.
>
> java.lang.NoSuchMethodError:
> org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
> at ophan.thrift.event.Event.(Event.java:10)
> at SimpleApp$.main(SimpleApp.scala:25)
> at SimpleApp.main(SimpleApp.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> Thanks,
>
> Tom
>
>
>


Re: Partitions with zero records & variable task times

2015-09-09 Thread Akhil Das
This post here has a bit information
http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/

Thanks
Best Regards

On Wed, Sep 9, 2015 at 6:44 AM, mark  wrote:

> As I understand things (maybe naively), my input data are stored in equal
> sized blocks in HDFS, and each block  represents a partition within Spark
> when read from HDFS, therefore each block should hold roughly the same
> number of records.
>
> So something is missing in my understanding - what can cause some
> partitions to have zero records and others to have roughly equal sized
> chunks (~50k in this case)?
>
> Before writing a custom partitioner, I would like to understand why has
> the default partitioner failed in my case?
> On 8 Sep 2015 3:00 pm, "Akhil Das"  wrote:
>
>> Try using a custom partitioner for the keys so that they will get evenly
>> distributed across tasks
>>
>> Thanks
>> Best Regards
>>
>> On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:
>>
>>> I am trying to tune a Spark job and have noticed some strange behavior -
>>> tasks in a stage vary in execution time, ranging from 2 seconds to 20
>>> seconds. I assume tasks should all run in roughly the same amount of time
>>> in a well tuned job.
>>>
>>> So I did some investigation - the fast tasks appear to have no records,
>>> whilst the slow tasks do. I need help understanding why this is happening.
>>>
>>> The code in the stage is pretty simple. All it does is:
>>>
>>> - filters records
>>> - maps records to a (key, record) tuple
>>> - reduces by key
>>>
>>> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS.
>>>
>>> To establish how many records in each partition I added this snippet:
>>>
>>> val counts = rdd.mapPartitions(iter => {
>>>   val ctx = TaskContext.get
>>>   val stageId = ctx.stageId
>>>   val partId = ctx.partitionId
>>>   val attemptid = ctx.taskAttemptId()
>>> Array(Array(stageId, partId, attemptid, iter.size)).iterator }
>>>   , true).collect()
>>>
>>> Which produces the following:
>>>
>>> 1  1  0  0
>>> 1  2  1  50489
>>> 1  3  2  0
>>> 1  4  3  0
>>> 1  5  4  0
>>> 1  6  5  53200
>>> 1  7  6  0
>>> 1  8  7  0
>>> 1  9  8  0
>>> 1  10   9  56946
>>> 1  11   10   0
>>> 1  12   11   0
>>> 1  13   12   0
>>> 1  14   13   59209
>>> 1  15   14   0
>>> 1  16   15   0
>>> 1  17   16   0
>>> 1  18   17   50202
>>> 1  19   18   0
>>> 1  20   19   0
>>> 1  21   20   0
>>> 1  22   21   54613
>>> 1  23   22   0
>>> 1  24   23   0
>>> 1  25   24   54157
>>> 1  26   25   0
>>> 1  27   26   0
>>> 1  28   27   0
>>> 1  29   28   53595
>>> 1  30   29   0
>>> 1  31   30   0
>>> 1  32   31   10750
>>>
>>>
>>> Looking at the logs, you can see the tasks that contain records have the
>>> longest run time:
>>>
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
>>> (TID 26) in 2782 ms on DG1322 (6/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0
>>> (TID 8) in 2815 ms on DG1322 (7/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
>>> (TID 20) in 2815 ms on DG1322 (8/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
>>> (TID 24) in 2840 ms on DG1321 (9/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
>>> (TID 30) in 2839 ms on DG1321 (10/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
>>> (TID 12) in 2878 ms on DG1321 (11/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
>>> (TID 31) in 2870 ms on DG1321 (12/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0
>>> (TID 19) in 2892 ms on DG1321 (13/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0
>>> (TID 1) in 2930 ms on DG1321 (14/32)
>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0
>>> (TID 7) in 2934 ms on 

Re: Spark ANN

2015-09-09 Thread Feynman Liang
My 2 cents:

* There is frequency domain processing available already (e.g. spark.ml DCT
transformer) but no FFT transformer yet because complex numbers are not
currently a Spark SQL datatype
* We shouldn't assume signals are even, so we need complex numbers to
implement the FFT
* I have not closely studied the relative performance tradeoffs, so please
do let me know if there's a significant difference in practice



On Tue, Sep 8, 2015 at 5:46 PM, Ulanov, Alexander 
wrote:

> That is an option too. Implementing convolutions with FFTs should be
> considered as well http://arxiv.org/pdf/1312.5851.pdf.
>
>
>
> *From:* Feynman Liang [mailto:fli...@databricks.com]
> *Sent:* Tuesday, September 08, 2015 12:07 PM
> *To:* Ulanov, Alexander
> *Cc:* Ruslan Dautkhanov; Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Just wondering, why do we need tensors? Is the implementation of convnets
> using im2col (see here )
> insufficient?
>
>
>
> On Tue, Sep 8, 2015 at 11:55 AM, Ulanov, Alexander <
> alexander.ula...@hpe.com> wrote:
>
> Ruslan, thanks for including me in the discussion!
>
>
>
> Dropout and other features such as Autoencoder were implemented, but not
> merged yet in order to have room for improving the internal Layer API. For
> example, there is an ongoing work with convolutional layer that
> consumes/outputs 2D arrays. We’ll probably need to change the Layer’s
> input/output type to tensors. This will influence dropout which will need
> some refactoring to handle tensors too. Also, all new components should
> have ML pipeline public interface. There is an umbrella issue for deep
> learning in Spark https://issues.apache.org/jira/browse/SPARK-5575 which
> includes various features of Autoencoder, in particular
> https://issues.apache.org/jira/browse/SPARK-10408. You are very welcome
> to join and contribute since there is a lot of work to be done.
>
>
>
> Best regards, Alexander
>
> *From:* Ruslan Dautkhanov [mailto:dautkha...@gmail.com]
> *Sent:* Monday, September 07, 2015 10:09 PM
> *To:* Feynman Liang
> *Cc:* Nick Pentreath; user; na...@yandex.ru
> *Subject:* Re: Spark ANN
>
>
>
> Found a dropout commit from avulanov:
>
>
> https://github.com/avulanov/spark/commit/3f25e26d10ef8617e46e35953fe0ad1a178be69d
>
>
>
> It probably hasn't made its way to MLLib (yet?).
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 8:34 PM, Feynman Liang 
> wrote:
>
> Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on
> the roadmap for 1.6 
> though, and there is a spark package
>  for
> dropout regularized logistic regression.
>
>
>
>
>
> On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov 
> wrote:
>
> Thanks!
>
>
>
> It does not look Spark ANN yet supports dropout/dropconnect or any other
> techniques that help avoiding overfitting?
>
> http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
>
> https://cs.nyu.edu/~wanli/dropc/dropc.pdf
>
>
>
> ps. There is a small copy-paste typo in
>
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
>
> should read B :)
>
>
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
> On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang 
> wrote:
>
> Backprop is used to compute the gradient here
> ,
> which is then optimized by SGD or LBFGS here
> 
>
>
>
> On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath 
> wrote:
>
> Haven't checked the actual code but that doc says "MLPC employes
> backpropagation for learning the model. .."?
>
>
>
>
>
>
> —
> Sent from Mailbox 
>
>
>
> On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov 
> wrote:
>
> http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html
>
>
>
> Implementation seems missing backpropagation?
>
> Was there is a good reason to omit BP?
>
> What are the drawbacks of a pure feedforward-only ANN?
>
>
>
> Thanks!
>
>
>
> --
> Ruslan Dautkhanov
>
>
>
>
>
>
>
>
>
>
>
>
>


Re: Best way to import data from Oracle to Spark?

2015-09-09 Thread Reynold Xin
Using the JDBC data source is probably the best way.
http://spark.apache.org/docs/1.4.1/sql-programming-guide.html#jdbc-to-other-databases

On Tue, Sep 8, 2015 at 10:11 AM, Cui Lin  wrote:

> What's the best way to import data from Oracle to Spark? Thanks!
>
>
> --
> Best regards!
>
> Lin,Cui
>


java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Tom Seddon
Hi,

I have a problem trying to get a fairly simple app working which makes use
of native avro libraries.  The app runs fine on my local machine and in
yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get
the error below.  I'm aware this is a version problem, as EMR runs an
earlier version of avro, and I am trying to use avro-1.7.7.

What's confusing me a great deal is the fact that this runs fine in
yarn-cluster mode.

What is it about yarn-cluster mode that means the application has access to
the correct version of the avro library?  I need to run in yarn-client mode
as I will be caching data to the driver machine in between batches.  I
think in yarn-cluster mode the driver can run on any machine in the cluster
so this would not work.

Grateful for any advice as I'm really stuck on this.  AWS support are
trying but they don't seem to know why this is happening either!

Just to note, I'm aware of Databricks spark-avro project and have used it.
This is an investigation to see if I can use RDDs instead of dataframes.

java.lang.NoSuchMethodError:
org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
at ophan.thrift.event.Event.(Event.java:10)
at SimpleApp$.main(SimpleApp.scala:25)
at SimpleApp.main(SimpleApp.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Thanks,

Tom


Re: How to read compressed parquet file

2015-09-09 Thread Cheng Lian
You need to use "har://" instead of "hdfs://" to read HAR files. Just 
tested against Spark 1.5, and it works as expected.


Cheng

On 9/9/15 3:29 PM, 李铖 wrote:
I think too many parquet files may be affect reading capability,so I 
use hadoop archive to combine them,but 
 sql_context.read.parquet(output_path) does not work on the file.

How to fix it ,please help me.
:)




How to read compressed parquet file

2015-09-09 Thread 李铖
I think too many parquet files may be affect reading capability,so I use
hadoop archive to combine them,but  sql_context.read.parquet(output_path)
does not work on the file.
How to fix it ,please help me.
:)


Re: No auto decompress in Spark Java textFile function?

2015-09-09 Thread Akhil Das
textFile used to work with .gz files, i haven't tested it on bz2 files. If
it isn't decompressing by default then what you have to do is to use the
sc.wholeTextFiles and then decompress each record (that being file) with
the corresponding codec.

Thanks
Best Regards

On Tue, Sep 8, 2015 at 6:49 PM, Chris Teoh  wrote:

> Hi Folks,
>
> I tried using Spark v1.2 on bz2 files in Java but the behaviour is
> different to the same textFile API call in Python and Scala.
>
> That being said, how do I process to read .tar.bz2 files in Spark's Java
> API?
>
> Thanks in advance
> Chris
>


[ANNOUNCE] Announcing Spark 1.5.0

2015-09-09 Thread Reynold Xin
Hi All,

Spark 1.5.0 is the sixth release on the 1.x line. This release represents
1400+ patches from 230+ contributors and 80+ institutions. To download
Spark 1.5.0 visit the downloads page.

A huge thanks go to all of the individuals and organizations involved in
development and testing of this release.

Visit the release notes [1] to read about the new features, or download [2]
the release today.

For errata in the contributions or release notes, please e-mail me
*directly* (not on-list).

Thanks to everyone who helped work on this release!

[1] http://spark.apache.org/releases/spark-release-1-5-0.html
[2] http://spark.apache.org/downloads.html


I am very new to Spark. I have a very basic question. I have an array of values: listofECtokens: Array[String] = Array(EC-17A5206955089011B, EC-17A5206955089011A) I want to filter an RDD for all of

2015-09-09 Thread prachicsa


I am very new to Spark.

I have a very basic question. I have an array of values:

listofECtokens: Array[String] = Array(EC-17A5206955089011B,
EC-17A5206955089011A)

I want to filter an RDD for all of these token values. I tried the following
way:

val ECtokens = for (token <- listofECtokens) rddAll.filter(line =>
line.contains(token))

Output:

ECtokens: Unit = ()

I got an empty Unit even when there are records with these tokens. What am I
doing wrong?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/I-am-very-new-to-Spark-I-have-a-very-basic-question-I-have-an-array-of-values-listofECtokens-Array-S-tp24617.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark summit Asia

2015-09-09 Thread mark
http://www.stratahadoopworld.com/singapore/index.html
On 8 Sep 2015 8:35 am, "Kevin Jung"  wrote:

> Is there any plan to hold Spark summit in Asia?
> I'm very much looking forward to it.
>
> Kevin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-summit-Asia-tp24598.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Filtering records for all values of an array in Spark

2015-09-09 Thread prachicsa


I am very new to Spark.

I have a very basic question. I have an array of values:

listofECtokens: Array[String] = Array(EC-17A5206955089011B,
EC-17A5206955089011A)

I want to filter an RDD for all of these token values. I tried the following
way:

val ECtokens = for (token <- listofECtokens) rddAll.filter(line =>
line.contains(token))

Output:

ECtokens: Unit = ()

I got an empty Unit even when there are records with these tokens. What am I
doing wrong?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filtering-records-for-all-values-of-an-array-in-Spark-tp24618.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to read files from S3 from Spark local when there is a http proxy

2015-09-09 Thread Steve Loughran

s3a:// has a proxy option 
https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html

s3n: apparently gets set up differently, though I've never tested it 
http://stackoverflow.com/questions/20241953/hadoop-distcp-to-s3-behind-http-proxy

> On 8 Sep 2015, at 13:51, tariq  wrote:
> 
> Hi svelusamy,
> 
> Were you able to make it work? I am facing the exact same problem. Getting
> connection timed when trying to access S3.
> 
> Thank you.
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-read-files-from-S3-from-Spark-local-when-there-is-a-http-proxy-tp21122p24604.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Help getting Spark JDBC metadata

2015-09-09 Thread Tom Barber
Hi guys

Hopefully someone can help me, or at least explain stuff to me.

I use a tool that required JDBC metadata (tables/columns etc)

So using spark 1.3.1 I try stuff like:

registerTempTable()
or saveAsTable()

on my parquet file.

The former doesn't show any table metadata for JDBC connections, but you
can query the table, which is annoying.
The latter shows a table but the column metadata is 1 column type
array, again I can query the table.

What I found I can do though is create a standard SQL table in beeline with
all its columns defined, and then insert into that table the contents of my
invisible parquet table, but I assume that removes the data from parquet
and stores it in hive, and I'd prefer to stick with parquet.

Ideally i'd like to be able to run

CREATE TEMPORARY TABLE XYZ
USING org.apache.spark.sql.parquet
OPTIONS (
  path "/user/ubuntu/file_with_id.par"
   define my table columns
)

Is something like that possible, does that make any sense?

Thanks

Tom


Re: java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Ted Yu
Have you checked the contents of __app__.jar ?



> On Sep 9, 2015, at 3:28 AM, Tom Seddon  wrote:
> 
> Thanks for your reply Aniket.
> 
> Ok I've done this and I'm still confused.  Output from running locally shows:
> 
> file:/home/tom/spark-avro/target/scala-2.10/simpleapp.jar
> file:/home/tom/spark-1.4.0-bin-hadoop2.4/conf/
> file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar
> file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar
> file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar
> file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
> file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunjce_provider.jar
> file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/zipfs.jar
> file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/localedata.jar
> file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/dnsns.jar
> file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunec.jar
> file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunpkcs11.jar
> saving text file...
> done!
> 
> In yarn-client mode:
> 
> file:/home/hadoop/simpleapp.jar
> file:/usr/lib/hadoop/hadoop-auth-2.6.0-amzn-0.jar
> ...
> file:/usr/lib/hadoop-mapreduce/avro-1.7.4.jar
> ...
> 
> And in yarn-cluster mode:
> file:/mnt/yarn/usercache/hadoop/appcache/application_1441787021820_0004/container_1441787021820_0004_01_01/__app__.jar
> ...
> file:/usr/lib/hadoop/lib/avro-1.7.4.jar
> ...
> saving text file...
> done!
> 
> In yarn-cluster mode it doesn't appear to have sight of the fat jar 
> (simpleapp), but can see avro-1.7.4, but runs fine!
> 
> Thanks,
> 
> Tom
> 
> 
>> On Wed, Sep 9, 2015 at 9:49 AM Aniket Bhatnagar  
>> wrote:
>> Hi Tom
>> 
>> There has to be a difference in classpaths in yarn-client and yarn-cluster 
>> mode. Perhaps a good starting point would be to print classpath as a first 
>> thing in SimpleApp.main. It should give clues around why it works in 
>> yarn-cluster mode.
>> 
>> Thanks,
>> Aniket
>> 
>> 
>>> On Wed, Sep 9, 2015, 2:11 PM Tom Seddon  wrote:
>>> Hi,
>>> 
>>> I have a problem trying to get a fairly simple app working which makes use 
>>> of native avro libraries.  The app runs fine on my local machine and in 
>>> yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get 
>>> the error below.  I'm aware this is a version problem, as EMR runs an 
>>> earlier version of avro, and I am trying to use avro-1.7.7.
>>> 
>>> What's confusing me a great deal is the fact that this runs fine in 
>>> yarn-cluster mode.
>>> 
>>> What is it about yarn-cluster mode that means the application has access to 
>>> the correct version of the avro library?  I need to run in yarn-client mode 
>>> as I will be caching data to the driver machine in between batches.  I 
>>> think in yarn-cluster mode the driver can run on any machine in the cluster 
>>> so this would not work.
>>> 
>>> Grateful for any advice as I'm really stuck on this.  AWS support are 
>>> trying but they don't seem to know why this is happening either!
>>> 
>>> Just to note, I'm aware of Databricks spark-avro project and have used it.  
>>> This is an investigation to see if I can use RDDs instead of dataframes.
>>> 
>>> java.lang.NoSuchMethodError: 
>>> org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
>>> at ophan.thrift.event.Event.(Event.java:10)
>>> at SimpleApp$.main(SimpleApp.scala:25)
>>> at SimpleApp.main(SimpleApp.scala)
>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> at 
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>>> at 
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> at java.lang.reflect.Method.invoke(Method.java:606)
>>> at 
>>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>>> at 
>>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>> 
>>> Thanks,
>>> 
>>> Tom


RE: Support of other languages?

2015-09-09 Thread Sun, Rui
Hi, Rahul,

To support a new language other than Java/Scala in spark, it is different 
between RDD API and DataFrame API.

For RDD API:

RDD is a distributed collection of the language-specific data types whose 
representation is unknown to JVM. Also transformation functions for RDD are 
written in the language which can't be executed on JVM. That's why worker 
processes of the language runtime are needed in such case. Generally, to 
support RDD API in the language, a subclass of the Scala RDD is needed on JVM 
side (for example, PythonRDD for python, RRDD for R) where compute() is 
overridden to send the serialized parent partition data (yes, what you mean 
data copy happens here) and the serialized transformation function via socket 
to the worker process. The worker process deserializes the partition data and 
the transformation function, then applies the function to the data. The result 
is sent back to JVM via socket after serialization as byte array. From JVM's 
viewpoint, the resulting RDD is a collection of byte arrays.

Performance is a concern in such case, as there are overheads, like launching 
of worker processes, serialization/deserialization of partition data, 
bi-directional communication cost of the data.
Besides, as the JVM can't know the real representation of data in the RDD, it 
is difficult and complex to support shuffle and aggregation operations. The 
Spark Core's built-in aggregator and shuffle can't be utilized directly. There 
should be language specific implementation to support these operations, which 
cause additional overheads.

Additional memory occupation by the worker processes is also a concern.

For DataFrame API:

Things are much simpler than RDD API. For DataFrame, data is read from Data 
Source API and is represented as native objects within the JVM and there is no 
language-specific transformation functions. Basically, DataFrame API in the 
language are just method wrappers to the corresponding ones in Scala DataFrame 
API.

Performance is not a concern. The computation is done on native objects in JVM, 
virtually no performance lost.

The only exception is UDF in DataFrame. The UDF() has to rely on language 
worker processes, similar to RDD API.

-Original Message-
From: Rahul Palamuttam [mailto:rahulpala...@gmail.com] 
Sent: Tuesday, September 8, 2015 10:54 AM
To: user@spark.apache.org
Subject: Support of other languages?

Hi,
I wanted to know more about how Spark supports R and Python, with respect to 
what gets copied into the language environments.

To clarify :

I know that PySpark utilizes py4j sockets to pass pickled python functions 
between the JVM and the python daemons. However, I wanted to know how it passes 
the data from the JVM into the daemon environment. I assume it has to copy the 
data over into the new environment, since python can't exactly operate in JVM 
heap space, (or can it?).  

I had the same question with respect to SparkR, though I'm not completely 
familiar with how they pass around native R code through the worker JVM's. 

The primary question I wanted to ask is does Spark make a second copy of data, 
so language-specific daemons can operate on the data? What are some of the 
other limitations encountered when we try to offer multi-language support, 
whether it's in performance or in general software architecture.
With python in particular the collect operation must be first written to disk 
and then read back from the python driver process.

Would appreciate any insight on this, and if there is any work happening in 
this area.

Thank you,

Rahul Palamuttam  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Support-of-other-languages-tp24599.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



bad substitution for [hdp.version] Error in spark on YARN job

2015-09-09 Thread Jeetendra Gangele
Hi ,
I am getting below error when running the spark job on YARN with HDP
cluster.
I have installed spark and yarn from Ambari and I am using spark 1.3.1 with
HDP version HDP-2.3.0.0-2557.

My spark-default.conf has correct entry

spark.driver.extraJavaOptions -Dhdp.version=2.3.0.0-2557
spark.yarn.am.extraJavaOptions -Dhdp.version=2.3.0.0-2557

can anybody from HDP reply on this not sure why hp.version is not getting
passed thought it setup in con file correctly. I tried passing same to
spark-submit with --conf "hdp.version=2.3.0.0-2557" same issue no lock.

I am running my job with spark-submit from spark-client machine



Exit code: 1
Exception message:
/hadoop/yarn/local/usercache/hdfs/appcache/application_1441798371988_0002/container_e08_1441798371988_0002_01_05/launch_container.sh:
line 22:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
bad substitution

Stack trace: ExitCodeException exitCode=1:
/hadoop/yarn/local/usercache/hdfs/appcache/application_1441798371988_0002/container_e08_1441798371988_0002_01_05/launch_container.sh:
line 22:
$PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
bad substitution

at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
at org.apache.hadoop.util.Shell.run(Shell.java:456)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
at
org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Container exited with a non-zero exit code 1


Re: I am very new to Spark. I have a very basic question. I have an array of values: listofECtokens: Array[String] = Array(EC-17A5206955089011B, EC-17A5206955089011A) I want to filter an RDD for all o

2015-09-09 Thread Ted Yu
Prachicsa:
If the number of EC tokens is high, please consider using a set instead of 
array for better lookup performance. 

BTW use short, descriptive subject for future emails. 



> On Sep 9, 2015, at 3:13 AM, Akhil Das  wrote:
> 
> Try this:
> 
> val tocks = Array("EC-17A5206955089011B","EC-17A5206955089011A")
> 
> val rddAll = sc.parallelize(List("This contains EC-17A5206955089011B","This 
> doesnt"))
> 
> rddAll.filter(line => {
> var found = false
> for(item <- tocks){
>if(line.contains(item)) found = true
> }
>found
>   }).collect()
> 
> 
> Output:
> res8: Array[String] = Array(This contains EC-17A5206955089011B)
> 
> Thanks
> Best Regards
> 
>> On Wed, Sep 9, 2015 at 3:25 PM, prachicsa  wrote:
>> 
>> 
>> I am very new to Spark.
>> 
>> I have a very basic question. I have an array of values:
>> 
>> listofECtokens: Array[String] = Array(EC-17A5206955089011B,
>> EC-17A5206955089011A)
>> 
>> I want to filter an RDD for all of these token values. I tried the following
>> way:
>> 
>> val ECtokens = for (token <- listofECtokens) rddAll.filter(line =>
>> line.contains(token))
>> 
>> Output:
>> 
>> ECtokens: Unit = ()
>> 
>> I got an empty Unit even when there are records with these tokens. What am I
>> doing wrong?
>> 
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/I-am-very-new-to-Spark-I-have-a-very-basic-question-I-have-an-array-of-values-listofECtokens-Array-S-tp24617.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Why is huge data shuffling in Spark when using union()/coalesce(1,false) on DataFrame?

2015-09-09 Thread Umesh Kacha
Hi Richard, thanks for the response. My use case is weird I need to process
data row by row for one partition and update required rows. Updated rows
percentage would be 30%. As per above stackoverflow.com answer suggestions
I refactored code to use mappartitionswithindex

JavaRDD indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new
Function2() { @Override public
Iterator call(Integer ind, Iterator rowIterator) throws Exception
{ List rowList = new ArrayList<>(); while (rowIterator.hasNext()) {
Row row = rowIterator.next(); List rowAsList =
iterate(JavaConversions.seqAsJavaList(row.toSeq())); Row updatedRow =
RowFactory.create(rowAsList.toArray()); rowList.add(updatedRow); } return
rowList.iterator(); } }, true). union(remainingrdd).coalesce(200,true);

Above code still hits memory limits as I have 2 tb data to process and
above resulted rdd I use to create DataFrame which again I use it to
register as temp table using hiveContext and execute few insert into
partitions query using hiveContext.sql

Please help me optimize above code.
On Sep 9, 2015 2:55 AM, "Richard Marscher"  wrote:

> Hi,
>
> what is the reasoning behind the use of `coalesce(1,false)`? This is
> saying to aggregate all data into a single partition, which must fit in
> memory on one node in the Spark cluster. If the cluster has more than one
> node it must shuffle to move the data. It doesn't seem like the following
> map or union necessitate coalesce, but the use case is not clear to me.
>
> On Fri, Sep 4, 2015 at 12:29 PM, unk1102  wrote:
>
>> Hi I have Spark job which does some processing on ORC data and stores back
>> ORC data using DataFrameWriter save() API introduced in Spark 1.4.0. I
>> have
>> the following piece of code which is using heavy shuffle memory. How do I
>> optimize below code? Is there anything wrong with it? It is working fine
>> as
>> expected only causing slowness because of GC pause and shuffles lots of
>> data
>> so hitting memory issues. Please guide I am new to Spark. Thanks in
>> advance.
>>
>> JavaRDD updatedDsqlRDD = orderedFrame.toJavaRDD().coalesce(1,
>> false).map(new Function() {
>>@Override
>>public Row call(Row row) throws Exception {
>> List rowAsList;
>> Row row1 = null;
>> if (row != null) {
>>   rowAsList = iterate(JavaConversions.seqAsJavaList(row.toSeq()));
>>   row1 = RowFactory.create(rowAsList.toArray());
>> }
>> return row1;
>>}
>> }).union(modifiedRDD);
>> DataFrame updatedDataFrame =
>> hiveContext.createDataFrame(updatedDsqlRDD,renamedSourceFrame.schema());
>>
>> updatedDataFrame.write().mode(SaveMode.Append).format("orc").partitionBy("entity",
>> "date").save("baseTable");
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Why-is-huge-data-shuffling-in-Spark-when-using-union-coalesce-1-false-on-DataFrame-tp24581.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>


I want to know the parition result in each node

2015-09-09 Thread szy
I am doing a scientific research about graph computing.And I decide to use
SPARK GRAPHX to do some experiments.Now I have a question about
partition.How can I get each partition in each workers  in my SPARK program
?
For example,if a graph includes vertexs 1,2,3,4.And I have two workers.After
executing PartitionStrategy, I want to get worker1's vertexs and worker2's
vertexs.Maybe 1,2 and 3 are in worker1, 4 is in worker2.But I don't know how
to get the paritition result in my program.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/I-want-to-know-the-parition-result-in-each-node-tp24619.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Applying transformations on a JavaRDD using reflection

2015-09-09 Thread Robin East
Have you got some code already that demonstrates the problem?
> On 9 Sep 2015, at 04:45, Nirmal Fernando  wrote:
> 
> Any thoughts?
> 
> On Tue, Sep 8, 2015 at 3:37 PM, Nirmal Fernando  > wrote:
> Hi All,
> 
> I'd like to apply a chain of Spark transformations (map/filter) on a given 
> JavaRDD. I'll have the set of Spark transformations as Function, and 
> even though I can determine the classes of T and A at the runtime, due to the 
> type erasure, I cannot call JavaRDD's transformations as they expect 
> generics. Any idea on how to resolve this?
> 
> -- 
> 
> Thanks & regards,
> Nirmal
> 
> Team Lead - WSO2 Machine Learner
> Associate Technical Lead - Data Technologies Team, WSO2 Inc.
> Mobile: +94715779733 
> Blog: http://nirmalfdo.blogspot.com/ 
> 
> 
> 
> 
> 
> -- 
> 
> Thanks & regards,
> Nirmal
> 
> Team Lead - WSO2 Machine Learner
> Associate Technical Lead - Data Technologies Team, WSO2 Inc.
> Mobile: +94715779733
> Blog: http://nirmalfdo.blogspot.com/ 
> 
> 



Re: I am very new to Spark. I have a very basic question. I have an array of values: listofECtokens: Array[String] = Array(EC-17A5206955089011B, EC-17A5206955089011A) I want to filter an RDD for all o

2015-09-09 Thread Akhil Das
Try this:

val tocks = Array("EC-17A5206955089011B","EC-17A5206955089011A")

val rddAll = sc.parallelize(List("This contains EC-17A5206955089011B","This
doesnt"))

rddAll.filter(line => {
 var found = false
 for(item <- tocks){
if(line.contains(item)) found = true
 }
found
  }).collect()


Output:
res8: Array[String] = Array(This contains EC-17A5206955089011B)

Thanks
Best Regards

On Wed, Sep 9, 2015 at 3:25 PM, prachicsa  wrote:

>
>
> I am very new to Spark.
>
> I have a very basic question. I have an array of values:
>
> listofECtokens: Array[String] = Array(EC-17A5206955089011B,
> EC-17A5206955089011A)
>
> I want to filter an RDD for all of these token values. I tried the
> following
> way:
>
> val ECtokens = for (token <- listofECtokens) rddAll.filter(line =>
> line.contains(token))
>
> Output:
>
> ECtokens: Unit = ()
>
> I got an empty Unit even when there are records with these tokens. What am
> I
> doing wrong?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/I-am-very-new-to-Spark-I-have-a-very-basic-question-I-have-an-array-of-values-listofECtokens-Array-S-tp24617.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: How to read compressed parquet file

2015-09-09 Thread 李铖
It works. at spark 1.4
Thanks a lot.

2015-09-09 17:21 GMT+08:00 Cheng Lian :

> You need to use "har://" instead of "hdfs://" to read HAR files. Just
> tested against Spark 1.5, and it works as expected.
>
> Cheng
>
>
> On 9/9/15 3:29 PM, 李铖 wrote:
>
> I think too many parquet files may be affect reading capability,so I use
> hadoop archive to combine them,but  sql_context.read.parquet(output_path)
> does not work on the file.
> How to fix it ,please help me.
> :)
>
>
>


Re: java.lang.NoSuchMethodError and yarn-client mode

2015-09-09 Thread Tom Seddon
Thanks for your reply Aniket.

Ok I've done this and I'm still confused.  Output from running locally
shows:

file:/home/tom/spark-avro/target/scala-2.10/simpleapp.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/conf/
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-hadoop2.4.0.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-core-3.2.10.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-api-jdo-3.2.6.jar
file:/home/tom/spark-1.4.0-bin-hadoop2.4/lib/datanucleus-rdbms-3.2.9.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunjce_provider.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/zipfs.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/localedata.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/dnsns.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunec.jar
file:/usr/lib/jvm/java-7-oracle/jre/lib/ext/sunpkcs11.jar
saving text file...
done!

In yarn-client mode:

file:/home/hadoop/simpleapp.jar
file:/usr/lib/hadoop/hadoop-auth-2.6.0-amzn-0.jar
...
*file:/usr/lib/hadoop-mapreduce/avro-1.7.4.jar*
...

And in yarn-cluster mode:
file:/mnt/yarn/usercache/hadoop/appcache/application_1441787021820_0004/container_1441787021820_0004_01_01/__app__.jar
...
*file:/usr/lib/hadoop/lib/avro-1.7.4.jar*
...
saving text file...
done!

In yarn-cluster mode it doesn't appear to have sight of the fat jar
(simpleapp), but can see avro-1.7.4, but runs fine!

Thanks,

Tom


On Wed, Sep 9, 2015 at 9:49 AM Aniket Bhatnagar 
wrote:

> Hi Tom
>
> There has to be a difference in classpaths in yarn-client and yarn-cluster
> mode. Perhaps a good starting point would be to print classpath as a first
> thing in SimpleApp.main. It should give clues around why it works in
> yarn-cluster mode.
>
> Thanks,
> Aniket
>
> On Wed, Sep 9, 2015, 2:11 PM Tom Seddon  wrote:
>
>> Hi,
>>
>> I have a problem trying to get a fairly simple app working which makes
>> use of native avro libraries.  The app runs fine on my local machine and in
>> yarn-cluster mode, but when I try to run it on EMR yarn-client mode I get
>> the error below.  I'm aware this is a version problem, as EMR runs an
>> earlier version of avro, and I am trying to use avro-1.7.7.
>>
>> What's confusing me a great deal is the fact that this runs fine in
>> yarn-cluster mode.
>>
>> What is it about yarn-cluster mode that means the application has access
>> to the correct version of the avro library?  I need to run in yarn-client
>> mode as I will be caching data to the driver machine in between batches.  I
>> think in yarn-cluster mode the driver can run on any machine in the cluster
>> so this would not work.
>>
>> Grateful for any advice as I'm really stuck on this.  AWS support are
>> trying but they don't seem to know why this is happening either!
>>
>> Just to note, I'm aware of Databricks spark-avro project and have used
>> it.  This is an investigation to see if I can use RDDs instead of
>> dataframes.
>>
>> java.lang.NoSuchMethodError:
>> org.apache.avro.Schema$Parser.parse(Ljava/lang/String;[Ljava/lang/String;)Lorg/apache/avro/Schema;
>> at ophan.thrift.event.Event.(Event.java:10)
>> at SimpleApp$.main(SimpleApp.scala:25)
>> at SimpleApp.main(SimpleApp.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
>> at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
>> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>> Thanks,
>>
>> Tom
>>
>>
>>


Re: Partitioning a RDD for training multiple classifiers

2015-09-09 Thread Maximo Gurmendez
Thanks Ben for your answer. I’ll explore what happens under the hoods in a  
data frame.

Regarding the ability to split a large RDD into n RDDs without requiring n 
passes to the large RDD.  Can partitionBy() help? If I partition by a key that 
corresponds to the the split criteria (i..e client id) and then cache each of 
those RDDs. Will that lessen the effect of repeated large traversals (since 
Spark will figure out that for each smaller RDD it just needs to traverse a 
subset of the partitions)?

Thanks!
   Máximo


On Sep 8, 2015, at 11:32 AM, Ben Tucker 
> wrote:

Hi Maximo —

This is a relatively naive answer, but I would consider structuring the RDD 
into a DataFrame, then saving the 'splits' using something like 
DataFrame.write.parquet(hdfs_path, byPartition=('client')). You could then read 
a DataFrame from each resulting parquet directory and do your per-client work 
from these. You mention re-using the splits, so this solution might be worth 
the file-writing time.

Does anyone know of a method that gets a collection of DataFrames — one for 
each partition, in the byPartition=('client') sense — from a 'big' DataFrame? 
Basically, the equivalent of writing by partition and creating a DataFrame for 
each result, but skipping the HDFS step.


On Tue, Sep 8, 2015 at 10:47 AM, Maximo Gurmendez 
> wrote:
Hi,
I have a RDD that needs to be split (say, by client) in order to train n 
models (i.e. one for each client). Since most of the classifiers that come with 
ml-lib only can accept an RDD as input (and cannot build multiple models in one 
pass - as I understand it), the only way to train n separate models is to 
create n RDDs (by filtering the original one).

Conceptually:

rdd1,rdd2,rdd3 = splitRdds(bigRdd)

the function splitRdd would use the standard filter mechanism .  I would then 
need to submit n training spark jobs. When I do this, will it mean that it will 
traverse the bigRdd n times? Is there a better way to persist the splitted rdd 
(i.e. save the split RDD in a cache)?

I could cache the bigRdd, but not sure that would be ver efficient either since 
it will require the same number of passes anyway (I think - but I’m relatively 
new to Spark). Also I’m planning on reusing the individual splits (rdd1, rdd2, 
etc so would be convenient to have them individually cached).

Another problem is that the splits are could be very skewed (i.e. one split 
could represent a large percentage of the original bigRdd ). So saving the 
split RDDs to disk (at least, naively) could be a challenge.

Is there any better way of doing this?

Thanks!
   Máximo





Re: long running Spark Streaming job and eventlog files

2015-09-09 Thread jarod7736
I believe I just added more disk space, and wrote a script to clean up 
the files after a few days.  Not really a good resolution, but it works.

> jamespowenjr [via Apache Spark User List] 
> 
> September 8, 2015 at 12:55 PM
> Jarod,
>   I'm having the same issue right now. Did you ever find a resolution?
>
>
>
>
> 
> If you reply to this email, your message will be added to the 
> discussion below:
> http://apache-spark-user-list.1001560.n3.nabble.com/long-running-Spark-Streaming-job-and-eventlog-files-tp21976p24608.html
>  
>
> To unsubscribe from long running Spark Streaming job and eventlog 
> files, click here 
> .
> NAML 
> 
>  
>


compose-unknown-contact.jpg (1K) 





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/long-running-Spark-Streaming-job-and-eventlog-files-tp21976p24621.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

[Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-09 Thread shahab
 Hi,
I am using Spark on Amazon EMR. So far I have not succeeded to submit the
application successfully, not sure what's problem. In the log file I see
the followings.
java.io.FileNotFoundException: File does not exist:
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the
fat jar file didn't solve the problem. I am out of clue now.
I want to submit a spark application, using aws web console, as a step. I
submit the application as : spark-submit --deploy-mode cluster --class
mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is
there any one who has similar problem with EMR?

best,
/Shahab


Re: Event logging not working when worker machine terminated

2015-09-09 Thread David Rosenstrauch

Standalone.

On 09/08/2015 11:18 PM, Jeff Zhang wrote:

What cluster mode do you use ? Standalone/Yarn/Mesos ?


On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch 
wrote:


Our Spark cluster is configured to write application history event logging
to a directory on HDFS.  This all works fine.  (I've tested it with Spark
shell.)

However, on a large, long-running job that we ran tonight, one of our
machines at the cloud provider had issues and had to be terminated and
replaced in the middle of the job.

The job completed correctly, and shows in state FINISHED in the "Completed
Applications" section of the Spark GUI.  However, when I try to look at the
application's history, the GUI says "Application history not found" and
"Application ... is still in progress".

The reason appears to be the machine that was terminated.  When I click on
the executor list for that job, Spark is showing the executor from the
terminated machine as still in state RUNNING.

Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.

Thanks,

DR

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: No auto decompress in Spark Java textFile function?

2015-09-09 Thread Chris Teoh
Thanks. What I noticed was the decompress works if the file is in HDFS but
not when it is a local file when working in a development environment.

Does anyone else have the same problem?
On Wed, 9 Sep 2015 at 4:40 pm Akhil Das  wrote:

> textFile used to work with .gz files, i haven't tested it on bz2 files. If
> it isn't decompressing by default then what you have to do is to use the
> sc.wholeTextFiles and then decompress each record (that being file) with
> the corresponding codec.
>
> Thanks
> Best Regards
>
> On Tue, Sep 8, 2015 at 6:49 PM, Chris Teoh  wrote:
>
>> Hi Folks,
>>
>> I tried using Spark v1.2 on bz2 files in Java but the behaviour is
>> different to the same textFile API call in Python and Scala.
>>
>> That being said, how do I process to read .tar.bz2 files in Spark's Java
>> API?
>>
>> Thanks in advance
>> Chris
>>
>
>


Re: [streaming] DStream with window performance issue

2015-09-09 Thread Cody Koeninger
It looked like from your graphs that you had a 10 second batch time, but
that your processing time was consistently 11 seconds.  If that's correct,
then yes your delay is going to keep growing.  You'd need to either
increase your batch time, or get your processing time down (either by
adding more resources or changing your code).

I'd expect adding a repartition / shuffle to increase processing time, not
decrease it.  What are you seeing after adding the partitionBy call?

On Tue, Sep 8, 2015 at 5:33 PM, Понькин Алексей  wrote:

> Oh my, I implemented one directStream instead of union of three but it is
> still growing exponential with window method.
>
> --
> Яндекс.Почта — надёжная почта
> http://mail.yandex.ru/neo2/collect/?exp=1=1
>
>
> 08.09.2015, 23:53, "Cody Koeninger" :
> > Yeah, that's the general idea.
> >
> > When you say hard code topic name, do you mean  Set(topicA, topicB,
> topicB) ?  You should be able to use a variable for that - read it from a
> config file, whatever.
> >
> > If you're talking about the match statement, yeah you'd need to hardcode
> your cases.
> >
> > On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей 
> wrote:
> >> Ok. I got it!
> >> But it seems that I need to hard code topic name.
> >>
> >> something like that?
> >>
> >> val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte],
> DefaultDecoder, DefaultDecoder](
> >>   ssc, kafkaParams, Set(topicA, topicB, topicB))
> >>   .transform{ rdd =>
> >> val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
> >> rdd.mapPartitionsWithIndex(
> >>   (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
> >> offsetRange(idx).topic match {
> >>   case "topicA" => ...
> >>   case "topicB" => ...
> >>   case _ => 
> >> }
> >>  )
> >> }
> >>
> >> 08.09.2015, 19:21, "Cody Koeninger" :
> >>> That doesn't really matter.  With the direct stream you'll get all
> objects for a given topicpartition in the same spark partition.  You know
> what topic it's from via hasOffsetRanges.  Then you can deserialize
> appropriately based on topic.
> >>>
> >>> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей 
> wrote:
>  The thing is, that these topics contain absolutely different AVRO
> objects(Array[Byte]) that I need to deserialize to different Java(Scala)
> objects, filter and then map to tuple (String, String). So i have 3 streams
> with different avro object in there. I need to cast them(using some
> business rules) to pairs and unite.
> 
>  --
>  Яндекс.Почта — надёжная почта
>  http://mail.yandex.ru/neo2/collect/?exp=1=1
> 
>  08.09.2015, 19:11, "Cody Koeninger" :
> 
> > I'm not 100% sure what's going on there, but why are you doing a
> union in the first place?
> >
> > If you want multiple topics in a stream, just pass them all in the
> set of topics to one call to createDirectStream
> >
> > On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin 
> wrote:
> >> Ok.
> >> Spark 1.4.1 on yarn
> >>
> >> Here is my application
> >> I have 4 different Kafka topics(different object streams)
> >>
> >> type Edge = (String,String)
> >>
> >> val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter(
> nonEmpty ).map( toEdge )
> >> val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter(
> nonEmpty ).map( toEdge )
> >> val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val u = a union b union c
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter(
> nonEmpty ).map( toEdge )
> >>
> >> val joinResult = source.rightOuterJoin( z )
> >> joinResult.foreachRDD { rdd=>
> >>   rdd.foreachPartition { partition =>
> >>   // save to result topic in kafka
> >>}
> >>  }
> >>
> >> The 'window' function in the code above is constantly growing,
> >> no matter how many events appeared in corresponding kafka topics
> >>
> >> but if I change one line from
> >>
> >> val source = u.window(Seconds(600), Seconds(10))
> >>
> >> to
> >>
> >> val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))
> >>
> >> val source = u.transform(_.partitionBy(partitioner.value)
> ).window(Seconds(600), Seconds(10))
> >>
> >> Everything works perfect.
> >>
> >> Perhaps the problem was in WindowedDStream
> >>
> >> I forced to use PartitionerAwareUnionRDD( partitionBy the same
> partitioner ) instead of UnionRDD.
> >>
> >> Nonetheless I did not see any hints about such a bahaviour in doc.
> >> Is it a bug or absolutely normal behaviour?
> >>
> >> 08.09.2015, 17:03, "Cody 

JNI issues with mesos

2015-09-09 Thread Adrian Bridgett
I'm trying to run spark (1.4.1) on top of mesos (0.23).  I've followed 
the instructions (uploaded spark tarball to HDFS, set executor uri in 
both places etc) and yet on the slaves it's failing to lauch even the 
SparkPi example with a JNI error.  It does run with a local master.  A 
day of debugging later and it's time to ask for help!


 bin/spark-submit --master mesos://10.1.201.191:5050 --class 
org.apache.spark.examples.SparkPi /tmp/examples.jar


(I'm putting the jar outside hdfs  - on both client box + slave (turned 
off other slaves for debugging) - due to 
http://apache-spark-user-list.1001560.n3.nabble.com/Remote-jar-file-td20649.html. 
I should note that I had the same JNI errors when using the mesos 
cluster dispatcher).


I'm using Oracle Java 8 (no other java - even openjdk - is installed)

As you can see, the slave is downloading the framework fine (you can 
even see it extracted on the slave).  Can anyone shed some light on 
what's going on - e.g. how is it attempting to run the executor?


I'm going to try a different JVM (and try a custom spark distribution) 
but I suspect that the problem is much more basic. Maybe it can't find 
the hadoop native libs?


Any light would be much appreciated :)  I've included the slaves's 
stderr below:


I0909 14:14:01.405185 32132 logging.cpp:177] Logging to STDERR
I0909 14:14:01.405256 32132 fetcher.cpp:409] Fetcher Info: 
{"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/20150826-133446-3217621258-5050-4064-S0\/ubuntu","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"hdfs:\/\/\/apps\/spark\/spark.tgz"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/20150826-133446-3217621258-5050-4064-S0\/frameworks\/20150826-133446-3217621258-5050-4064-211198\/executors\/20150826-133446-3217621258-5050-4064-S0\/runs\/38077da2-553e-4888-bfa3-ece2ab2119f3","user":"ubuntu"}
I0909 14:14:01.406332 32132 fetcher.cpp:364] Fetching URI 
'hdfs:///apps/spark/spark.tgz'
I0909 14:14:01.406344 32132 fetcher.cpp:238] Fetching directly into the 
sandbox directory
I0909 14:14:01.406358 32132 fetcher.cpp:176] Fetching URI 
'hdfs:///apps/spark/spark.tgz'
I0909 14:14:01.679055 32132 fetcher.cpp:104] Downloading resource with 
Hadoop client from 'hdfs:///apps/spark/spark.tgz' to 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
I0909 14:14:05.492626 32132 fetcher.cpp:76] Extracting with command: tar 
-C 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3' 
-xf 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
I0909 14:14:07.489753 32132 fetcher.cpp:84] Extracted 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz' 
into 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3'
W0909 14:14:07.489784 32132 fetcher.cpp:260] Copying instead of 
extracting resource from URI with 'extract' flag, because it does not 
seem to be an archive: hdfs:///apps/spark/spark.tgz
I0909 14:14:07.489791 32132 fetcher.cpp:441] Fetched 
'hdfs:///apps/spark/spark.tgz' to 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
Error: A JNI error has occurred, please check your installation and try 
again

Exception in thread "main" java.lang.NoClassDefFoundError: org/slf4j/Logger
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at 
sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at 
sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)

Caused by: java.lang.ClassNotFoundException: org.slf4j.Logger
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 7 more




spark.kryo.registrationRequired: Tuple2 is not registered

2015-09-09 Thread Marius Soutier
Hi all,

as indicated in the title, I’m using Kryo with a custom Kryo serializer, but as 
soon as I enable `spark.kryo.registrationRequired`, my Spark Streaming job 
fails to start with this exception:

Class is not registered: scala.collection.immutable.Range

When I register it, it continues with Tuple2, which I cannot serialize sanely 
for all specialized forms. According to the documentation, this should be 
handled by Chill. Is this a bug or what am I missing?

I’m using Spark 1.4.1.


Cheers
- Marius


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark history server + yarn log aggregation issue

2015-09-09 Thread michael.england
Hi,

I am running Spark-on-YARN on a secure cluster with yarn log aggregation set 
up. Once a job completes, when viewing stdout/stderr executor logs in the Spark 
history server UI it redirects me to the local nodemanager where a page appears 
for a second saying ‘Redirecting to log server….’ and then redirects me to the 
aggregated job history server log page. However, the aggregated job history 
page sends me to 
http://:/jobhistory..
 Instead of https://..., causing odd characters to appear. If you manually 
specify https:// in the URL, this works as expected, however the page 
automatically refreshes and causes this to go back to http again.

I have set the job.log.server.url property in yarn-site.xml to include https:


yarn.log.server.url
https://.domain.com:port/jobhistory/logs/
  

I know this isn’t a Spark issue specifically, but I wondered if anyone else has 
experienced this issue and knows how to get around it?

Thanks,
Mike


This e-mail (including any attachments) is private and confidential, may 
contain proprietary or privileged information and is intended for the named 
recipient(s) only. Unintended recipients are strictly prohibited from taking 
action on the basis of information in this e-mail and must contact the sender 
immediately, delete this e-mail (and all attachments) and destroy any hard 
copies. Nomura will not accept responsibility or liability for the accuracy or 
completeness of, or the presence of any virus or disabling code in, this 
e-mail. If verification is sought please request a hard copy. Any reference to 
the terms of executed transactions should be treated as preliminary only and 
subject to formal written confirmation by Nomura. Nomura reserves the right to 
retain, monitor and intercept e-mail communications through its networks 
(subject to and in accordance with applicable laws). No confidentiality or 
privilege is waived or lost by Nomura by any mistransmission of this e-mail. 
Any reference to "Nomura" is a reference to any entity in the Nomura Holdings, 
Inc. group. Please read our Electronic Communications Legal Notice which forms 
part of this e-mail: http://www.Nomura.com/email_disclaimer.htm



Re: Java vs. Scala for Spark

2015-09-09 Thread Cody Koeninger
Java 8 lambdas are broken to the point of near-uselessness (because of
checked exceptions and inability to close over non-final references).  I
wouldn't use them as a deciding factor in language choice.

Any competent developer should be able to write reasonable java-in-scala
after a week and reading a copy of "Scala for the Impatient"

On Tue, Sep 8, 2015 at 11:15 AM, Jerry Lam  wrote:

> Hi Bryan,
>
> I would choose a language based on the requirements. It does not make
> sense if you have a lot of dependencies that are java-based components and
> interoperability between java and scala is not always obvious.
>
> I agree with the above comments that Java is much more verbose than Scala
> in many cases if not all. However, I personally don't find the verbosity is
> a key factor in choosing a language. For the sake of argument, will you be
> discouraged if you need to write 3 lines of Java for 1 line of scala? I
> really don't care the number of lines as long as I can finish the task
> within a period of time.
>
> I believe, correct me if I'm wrong please, all spark functionalities you
> can find in Scala are also available in Java that includes the mllib,
> sparksql, streaming, etc. So you won't miss any features of spark by using
> Java.
>
> It seems the questions should be
> - what language do the developers are comfortable with?
> - what are the components in the system that will constraint the choice of
> the language?
>
> Best Regards,
>
> Jerry
>
> On Tue, Sep 8, 2015 at 11:59 AM, Dean Wampler 
> wrote:
>
>> It's true that Java 8 lambdas help. If you've read Learning Spark, where
>> they use Java 7, Python, and Scala for the examples, it really shows how
>> awful Java without lambdas is for Spark development.
>>
>> Still, there are several "power tools" in Scala I would sorely miss using
>> Java 8:
>>
>> 1. The REPL (interpreter): I do most of my work in the REPL, then move
>> the code to compiled code when I'm ready to turn it into a batch job. Even
>> better, use Spark Notebook ! (and on GitHub
>> ).
>> 2. Tuples: It's just too convenient to use tuples for schemas, return
>> values from functions, etc., etc., etc.,
>> 3. Pattern matching: This has no analog in Java, so it's hard to
>> appreciate it until you understand it, but see this example
>> 
>> for a taste of how concise it makes code!
>> 4. Type inference: Spark really shows its utility. It means a lot less
>> code to write, but you get the hints of what you just wrote!
>>
>> My $0.02.
>>
>> dean
>>
>>
>> Dean Wampler, Ph.D.
>> Author: Programming Scala, 2nd Edition
>>  (O'Reilly)
>> Typesafe 
>> @deanwampler 
>> http://polyglotprogramming.com
>>
>> On Tue, Sep 8, 2015 at 10:28 AM, Igor Berman 
>> wrote:
>>
>>> we are using java7..its much more verbose that java8 or scala examples
>>> in addition there sometimes libraries that has no java  api, so you need
>>> to write them by yourself(e.g. graphx)
>>> on the other hand, scala is not trivial language like java, so it
>>> depends on your team
>>>
>>> On 8 September 2015 at 17:44, Bryan Jeffrey 
>>> wrote:
>>>
 Thank you for the quick responses.  It's useful to have some insight
 from folks already extensively using Spark.

 Regards,

 Bryan Jeffrey

 On Tue, Sep 8, 2015 at 10:28 AM, Sean Owen  wrote:

> Why would Scala vs Java performance be different Ted? Relatively
> speaking there is almost no runtime difference; it's the same APIs or
> calls via a thin wrapper. Scala/Java vs Python is a different story.
>
> Java libraries can be used in Scala. Vice-versa too, though calling
> Scala-generated classes can be clunky in Java. What's your concern
> about interoperability Jeffrey?
>
> I disagree that Java 7 vs Scala usability is sooo different, but it's
> certainly much more natural to use Spark in Scala. Java 8 closes a lot
> of the usability gap with Scala, but not all of it. Enough that it's
> not crazy for a Java shop to stick to Java 8 + Spark and not be at a
> big disadvantage.
>
> The downsides of Scala IMHO are that it provides too much: lots of
> nice features (closures! superb collections!), lots of rope to hang
> yourself too (implicits sometimes!) and some WTF features (XML
> literals!) Learning the good useful bits of Scala isn't hard. You can
> always write Scala code as much like Java as you like, I find.
>
> Scala tooling is different from Java tooling; that's an
> underappreciated barrier. For example I think SBT is good for
> development, bad for general 

Re: [streaming] DStream with window performance issue

2015-09-09 Thread Понькин Алексей
That`s correct, I have 10 seconds batch.
The problem is actually in processing time, it is increasing constantly no 
matter how small or large my window duration is.
I am trying to prepare some example code to clarify my use case.

-- 
Яндекс.Почта — надёжная почта
http://mail.yandex.ru/neo2/collect/?exp=1=1


09.09.2015, 17:04, "Cody Koeninger" :
> It looked like from your graphs that you had a 10 second batch time, but that 
> your processing time was consistently 11 seconds.  If that's correct, then 
> yes your delay is going to keep growing.  You'd need to either increase your 
> batch time, or get your processing time down (either by adding more resources 
> or changing your code).
>
> I'd expect adding a repartition / shuffle to increase processing time, not 
> decrease it.  What are you seeing after adding the partitionBy call?
>
> On Tue, Sep 8, 2015 at 5:33 PM, Понькин Алексей  wrote:
>> Oh my, I implemented one directStream instead of union of three but it is 
>> still growing exponential with window method.
>>
>> --
>> Яндекс.Почта — надёжная почта
>> http://mail.yandex.ru/neo2/collect/?exp=1=1
>>
>> 08.09.2015, 23:53, "Cody Koeninger" :
>>
>>> Yeah, that's the general idea.
>>>
>>> When you say hard code topic name, do you mean  Set(topicA, topicB, topicB) 
>>> ?  You should be able to use a variable for that - read it from a config 
>>> file, whatever.
>>>
>>> If you're talking about the match statement, yeah you'd need to hardcode 
>>> your cases.
>>>
>>> On Tue, Sep 8, 2015 at 3:49 PM, Понькин Алексей  wrote:
 Ok. I got it!
 But it seems that I need to hard code topic name.

 something like that?

 val source = KafkaUtils.createDirectStream[Array[Byte], Array[Byte], 
 DefaultDecoder, DefaultDecoder](
   ssc, kafkaParams, Set(topicA, topicB, topicB))
   .transform{ rdd =>
     val offsetRange = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     rdd.mapPartitionsWithIndex(
       (idx: Int, itr: Iterator[(Array[Byte], Array[Byte])]) =>
         offsetRange(idx).topic match {
           case "topicA" => ...
           case "topicB" => ...
           case _ => 
         }
      )
     }

 08.09.2015, 19:21, "Cody Koeninger" :
> That doesn't really matter.  With the direct stream you'll get all 
> objects for a given topicpartition in the same spark partition.  You know 
> what topic it's from via hasOffsetRanges.  Then you can deserialize 
> appropriately based on topic.
>
> On Tue, Sep 8, 2015 at 11:16 AM, Понькин Алексей  
> wrote:
>> The thing is, that these topics contain absolutely different AVRO 
>> objects(Array[Byte]) that I need to deserialize to different Java(Scala) 
>> objects, filter and then map to tuple (String, String). So i have 3 
>> streams with different avro object in there. I need to cast them(using 
>> some business rules) to pairs and unite.
>>
>> --
>> Яндекс.Почта — надёжная почта
>> http://mail.yandex.ru/neo2/collect/?exp=1=1
>>
>> 08.09.2015, 19:11, "Cody Koeninger" :
>>
>>> I'm not 100% sure what's going on there, but why are you doing a union 
>>> in the first place?
>>>
>>> If you want multiple topics in a stream, just pass them all in the set 
>>> of topics to one call to createDirectStream
>>>
>>> On Tue, Sep 8, 2015 at 10:52 AM, Alexey Ponkin  
>>> wrote:
 Ok.
 Spark 1.4.1 on yarn

 Here is my application
 I have 4 different Kafka topics(different object streams)

 type Edge = (String,String)

 val a = KafkaUtils.createDirectStream[...](sc,"A",params).filter( 
 nonEmpty ).map( toEdge )
 val b = KafkaUtils.createDirectStream[...](sc,"B",params).filter( 
 nonEmpty ).map( toEdge )
 val c = KafkaUtils.createDirectStream[...](sc,"C",params).filter( 
 nonEmpty ).map( toEdge )

 val u = a union b union c

 val source = u.window(Seconds(600), Seconds(10))

 val z = KafkaUtils.createDirectStream[...](sc,"Z",params).filter( 
 nonEmpty ).map( toEdge )

 val joinResult = source.rightOuterJoin( z )
 joinResult.foreachRDD { rdd=>
   rdd.foreachPartition { partition =>
       // save to result topic in kafka
    }
  }

 The 'window' function in the code above is constantly growing,
 no matter how many events appeared in corresponding kafka topics

 but if I change one line from

 val source = u.window(Seconds(600), Seconds(10))

 to

 val partitioner = ssc.sparkContext.broadcast(new HashPartitioner(8))


Loading json data into Pair RDD in Spark using java

2015-09-09 Thread prachicsa


I am very new to Spark.

I have a very basic question. I read a file in Spark RDD in which each line
is a JSON. I want to make apply groupBy like transformations. So I want to
transform each JSON line into a PairRDD. Is there a straight forward way to
do it in Java?

My json is like this:

{
"tmpl": "p",
"bw": "874",
"aver": {"cnac": "US","t1": "2"},
}

Currently, the way I am trying is the to split by , first and then by :. Is
there any straight forward way to do this?

My current code:

val pairs = setECrecords.flatMap(x => (x.split(",")))
pairs.foreach(println)

val pairsastuple = pairs.map(x => if(x.split("=").length>1)
(x.split("=")(0), x.split("=")(1)) else (x.split("=")(0), x))





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Loading-json-data-into-Pair-RDD-in-Spark-using-java-tp24624.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Event logging not working when worker machine terminated

2015-09-09 Thread Charles Chao
I have encountered the same problem after migrating from 1.2.2 to 1.3.0.
After some searching this appears to be a bug introduced in 1.3. Hopefully
it¹s fixed in 1.4.

Thanks, 

Charles





On 9/9/15, 7:30 AM, "David Rosenstrauch"  wrote:

>Standalone.
>
>On 09/08/2015 11:18 PM, Jeff Zhang wrote:
>> What cluster mode do you use ? Standalone/Yarn/Mesos ?
>>
>>
>> On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch 
>> wrote:
>>
>>> Our Spark cluster is configured to write application history event
>>>logging
>>> to a directory on HDFS.  This all works fine.  (I've tested it with
>>>Spark
>>> shell.)
>>>
>>> However, on a large, long-running job that we ran tonight, one of our
>>> machines at the cloud provider had issues and had to be terminated and
>>> replaced in the middle of the job.
>>>
>>> The job completed correctly, and shows in state FINISHED in the
>>>"Completed
>>> Applications" section of the Spark GUI.  However, when I try to look
>>>at the
>>> application's history, the GUI says "Application history not found" and
>>> "Application ... is still in progress".
>>>
>>> The reason appears to be the machine that was terminated.  When I
>>>click on
>>> the executor list for that job, Spark is showing the executor from the
>>> terminated machine as still in state RUNNING.
>>>
>>> Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.
>>>
>>> Thanks,
>>>
>>> DR
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Event logging not working when worker machine terminated

2015-09-09 Thread David Rosenstrauch
Thanks for the info.  Do you know if there's a ticket already open for 
this issue?  If so, I'd like to monitor it.


Thanks,

DR

On 09/09/2015 11:50 AM, Charles Chao wrote:

I have encountered the same problem after migrating from 1.2.2 to 1.3.0.
After some searching this appears to be a bug introduced in 1.3. Hopefully
it¹s fixed in 1.4.

Thanks,

Charles





On 9/9/15, 7:30 AM, "David Rosenstrauch"  wrote:


Standalone.

On 09/08/2015 11:18 PM, Jeff Zhang wrote:

What cluster mode do you use ? Standalone/Yarn/Mesos ?


On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch 
wrote:


Our Spark cluster is configured to write application history event
logging
to a directory on HDFS.  This all works fine.  (I've tested it with
Spark
shell.)

However, on a large, long-running job that we ran tonight, one of our
machines at the cloud provider had issues and had to be terminated and
replaced in the middle of the job.

The job completed correctly, and shows in state FINISHED in the
"Completed
Applications" section of the Spark GUI.  However, when I try to look
at the
application's history, the GUI says "Application history not found" and
"Application ... is still in progress".

The reason appears to be the machine that was terminated.  When I
click on
the executor list for that job, Spark is showing the executor from the
terminated machine as still in state RUNNING.

Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.

Thanks,

DR

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org








-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Loading json data into Pair RDD in Spark using java

2015-09-09 Thread Ted Yu
Please take a look at the example in SPARK-10287

FYI

On Wed, Sep 9, 2015 at 8:50 AM, prachicsa  wrote:

>
>
> I am very new to Spark.
>
> I have a very basic question. I read a file in Spark RDD in which each line
> is a JSON. I want to make apply groupBy like transformations. So I want to
> transform each JSON line into a PairRDD. Is there a straight forward way to
> do it in Java?
>
> My json is like this:
>
> {
> "tmpl": "p",
> "bw": "874",
> "aver": {"cnac": "US","t1": "2"},
> }
>
> Currently, the way I am trying is the to split by , first and then by :. Is
> there any straight forward way to do this?
>
> My current code:
>
> val pairs = setECrecords.flatMap(x => (x.split(",")))
> pairs.foreach(println)
>
> val pairsastuple = pairs.map(x => if(x.split("=").length>1)
> (x.split("=")(0), x.split("=")(1)) else (x.split("=")(0), x))
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Loading-json-data-into-Pair-RDD-in-Spark-using-java-tp24624.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: JNI issues with mesos

2015-09-09 Thread Adrian Bridgett

5mins later...

Trying 1.5 with a fairly plain build:
./make-distribution.sh --tgz --name os1 -Phadoop-2.6

and on my first attempt stderr showed:
I0909 15:16:49.392144  1619 fetcher.cpp:441] Fetched 
'hdfs:///apps/spark/spark15.tgz' to 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S1/frameworks/20150826-133446-3217621258-5050-4064-211204/executors/20150826-133446-3217621258-5050-4064-S1/runs/43026ba8-6624-4817-912c-3d7573433102/spark15.tgz'

sh: 1: cd: can't cd to spark15.tgz
sh: 1: ./bin/spark-class: not found

Aha, let's rename the file in hdfs (and the two configs) from 
spark15.tgz to spark-1.5.0-bin-os1.tgz...

Success!!!

The same trick with 1.4 doesn't work, but now that I have something that 
does I can make progress.


Hopefully this helps someone else :-)

Adrian

On 09/09/2015 16:59, Adrian Bridgett wrote:
I'm trying to run spark (1.4.1) on top of mesos (0.23).  I've followed 
the instructions (uploaded spark tarball to HDFS, set executor uri in 
both places etc) and yet on the slaves it's failing to lauch even the 
SparkPi example with a JNI error.  It does run with a local master.  A 
day of debugging later and it's time to ask for help!


 bin/spark-submit --master mesos://10.1.201.191:5050 --class 
org.apache.spark.examples.SparkPi /tmp/examples.jar


(I'm putting the jar outside hdfs  - on both client box + slave 
(turned off other slaves for debugging) - due to 
http://apache-spark-user-list.1001560.n3.nabble.com/Remote-jar-file-td20649.html. 
I should note that I had the same JNI errors when using the mesos 
cluster dispatcher).


I'm using Oracle Java 8 (no other java - even openjdk - is installed)

As you can see, the slave is downloading the framework fine (you can 
even see it extracted on the slave).  Can anyone shed some light on 
what's going on - e.g. how is it attempting to run the executor?


I'm going to try a different JVM (and try a custom spark distribution) 
but I suspect that the problem is much more basic. Maybe it can't find 
the hadoop native libs?


Any light would be much appreciated :)  I've included the slaves's 
stderr below:


I0909 14:14:01.405185 32132 logging.cpp:177] Logging to STDERR
I0909 14:14:01.405256 32132 fetcher.cpp:409] Fetcher Info: 
{"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/20150826-133446-3217621258-5050-4064-S0\/ubuntu","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"hdfs:\/\/\/apps\/spark\/spark.tgz"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/20150826-133446-3217621258-5050-4064-S0\/frameworks\/20150826-133446-3217621258-5050-4064-211198\/executors\/20150826-133446-3217621258-5050-4064-S0\/runs\/38077da2-553e-4888-bfa3-ece2ab2119f3","user":"ubuntu"}
I0909 14:14:01.406332 32132 fetcher.cpp:364] Fetching URI 
'hdfs:///apps/spark/spark.tgz'
I0909 14:14:01.406344 32132 fetcher.cpp:238] Fetching directly into 
the sandbox directory
I0909 14:14:01.406358 32132 fetcher.cpp:176] Fetching URI 
'hdfs:///apps/spark/spark.tgz'
I0909 14:14:01.679055 32132 fetcher.cpp:104] Downloading resource with 
Hadoop client from 'hdfs:///apps/spark/spark.tgz' to 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
I0909 14:14:05.492626 32132 fetcher.cpp:76] Extracting with command: 
tar -C 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3' 
-xf 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
I0909 14:14:07.489753 32132 fetcher.cpp:84] Extracted 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz' 
into 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3'
W0909 14:14:07.489784 32132 fetcher.cpp:260] Copying instead of 
extracting resource from URI with 'extract' flag, because it does not 
seem to be an archive: hdfs:///apps/spark/spark.tgz
I0909 14:14:07.489791 32132 fetcher.cpp:441] Fetched 
'hdfs:///apps/spark/spark.tgz' to 
'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
Error: A JNI error has occurred, please check your installation and 
try again
Exception in thread "main" java.lang.NoClassDefFoundError: 
org/slf4j/Logger

at 

Re: New to Spark - Paritioning Question

2015-09-09 Thread Richard Marscher
Ah I see. In that case, the groupByKey function does guarantee every key is
on exactly one partition matched with the aggregated data. This can be
improved depending on what you want to do after. Group by key only
aggregates the data after shipping it across the cluster. Meanwhile, using
reduceByKey will do aggregation on each node first, then ship those results
to the final node and partition to finalize the aggregation there. If that
makes sense.

So say Node 1 has pairs: (a, 1), (b, 2), (b, 6)
Node 2 has pairs: (a, 2), (a,3), (b, 4)

group by would say send both a pair and b pairs across the network. If you
did reduce with the aggregate of sum then you'd expect it to ship (b, 8)
from Node 1 or (a, 5) from Node 2 since it did the local aggregation first.

You are correct that doing something with expensive side-effects like
writing to a database (connections and network + I/O) is best done with the
mapPartitions or foreachPartition type of functions on RDD so you can share
a database connection and also potentially do things like batch statements.


On Tue, Sep 8, 2015 at 7:37 PM, Mike Wright  wrote:

> Thanks for the response!
>
> Well, in retrospect each partition doesn't need to be restricted to a
> single key. But, I cannot have values associated with a key span partitions
> since they all need to be processed together for a key to facilitate
> cumulative calcs. So provided an individual key has all its values in a
> single partition, I'm OK.
>
> Additionally, the values will be written to the database, and from what I
> have read doing this at the partition level is the best compromise between
> 1) Writing the calculated values for each key (lots of connect/disconnects)
> and collecting them all at the end and writing them all at once.
>
> I am using a groupBy against the filtered RDD the get the grouping I want,
> but apparently this may not be the most efficient way, and it seems that
> everything is always in a single partition under this scenario.
>
>
> ___
>
> *Mike Wright*
> Principal Architect, Software Engineering
>
> SNL Financial LC
> 434-951-7816 *p*
> 434-244-4466 *f*
> 540-470-0119 *m*
>
> mwri...@snl.com
>
> On Tue, Sep 8, 2015 at 5:38 PM, Richard Marscher  > wrote:
>
>> That seems like it could work, although I don't think `partitionByKey` is
>> a thing, at least for RDD. You might be able to merge step #2 and step #3
>> into one step by using the `reduceByKey` function signature that takes in a
>> Partitioner implementation.
>>
>> def reduceByKey(partitioner: Partitioner
>> 
>> , func: (V, V) ⇒ V): RDD
>> 
>> [(K, V)]
>>
>> Merge the values for each key using an associative reduce function. This
>> will also perform the merging locally on each mapper before sending results
>> to a reducer, similarly to a "combiner" in MapReduce.
>>
>> The tricky part might be getting the partitioner to know about the number
>> of partitions, which I think it needs to know upfront in `abstract def
>> numPartitions: Int`. The `HashPartitioner` for example takes in the
>> number as a constructor argument, maybe you could use that with an upper
>> bound size if you don't mind empty partitions. Otherwise you might have to
>> mess around to extract the exact number of keys if it's not readily
>> available.
>>
>> Aside: what is the requirement to have each partition only contain the
>> data related to one key?
>>
>> On Fri, Sep 4, 2015 at 11:06 AM, mmike87  wrote:
>>
>>> Hello, I am new to Apache Spark and this is my company's first Spark
>>> project.
>>> Essentially, we are calculating models dealing with Mining data using
>>> Spark.
>>>
>>> I am holding all the source data in a persisted RDD that we will refresh
>>> periodically. When a "scenario" is passed to the Spark job (we're using
>>> Job
>>> Server) the persisted RDD is filtered to the relevant mines. For
>>> example, we
>>> may want all mines in Chile and the 1990-2015 data for each.
>>>
>>> Many of the calculations are cumulative, that is when we apply user-input
>>> "adjustment factors" to a value, we also need the "flexed" value we
>>> calculated for that mine previously.
>>>
>>> To ensure that this works, the idea if to:
>>>
>>> 1) Filter the superset to relevant mines (done)
>>> 2) Group the subset by the unique identifier for the mine. So, a group
>>> may
>>> be all the rows for mine "A" for 1990-2015
>>> 3) I then want to ensure that the RDD is partitioned by the Mine
>>> Identifier
>>> (and Integer).
>>>
>>> It's step 3 that is confusing me. I suspect it's very easy ... do I
>>> simply
>>> use PartitionByKey?
>>>
>>> We're using Java if that makes any difference.
>>>
>>> Thanks!
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> 

Re: Event logging not working when worker machine terminated

2015-09-09 Thread Charles Chao
Fixed in 1.3.1

https://issues.apache.org/jira/browse/SPARK-6950

Thanks, 

Charles





On 9/9/15, 8:54 AM, "David Rosenstrauch"  wrote:

>Thanks for the info.  Do you know if there's a ticket already open for
>this issue?  If so, I'd like to monitor it.
>
>Thanks,
>
>DR
>
>On 09/09/2015 11:50 AM, Charles Chao wrote:
>> I have encountered the same problem after migrating from 1.2.2 to 1.3.0.
>> After some searching this appears to be a bug introduced in 1.3.
>>Hopefully
>> it¹s fixed in 1.4.
>>
>> Thanks,
>>
>> Charles
>>
>>
>>
>>
>>
>> On 9/9/15, 7:30 AM, "David Rosenstrauch"  wrote:
>>
>>> Standalone.
>>>
>>> On 09/08/2015 11:18 PM, Jeff Zhang wrote:
 What cluster mode do you use ? Standalone/Yarn/Mesos ?


 On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch

 wrote:

> Our Spark cluster is configured to write application history event
> logging
> to a directory on HDFS.  This all works fine.  (I've tested it with
> Spark
> shell.)
>
> However, on a large, long-running job that we ran tonight, one of our
> machines at the cloud provider had issues and had to be terminated
>and
> replaced in the middle of the job.
>
> The job completed correctly, and shows in state FINISHED in the
> "Completed
> Applications" section of the Spark GUI.  However, when I try to look
> at the
> application's history, the GUI says "Application history not found"
>and
> "Application ... is still in progress".
>
> The reason appears to be the machine that was terminated.  When I
> click on
> the executor list for that job, Spark is showing the executor from
>the
> terminated machine as still in state RUNNING.
>
> Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.
>
> Thanks,
>
> DR
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: What should be the optimal value for spark.sql.shuffle.partition?

2015-09-09 Thread Richard Marscher
I see you reposted with more details:

"I have 2 TB of
skewed data to process and then convert rdd into dataframe and use it as
table in hiveContext.sql(). I am using 60 executors with 20 GB memory and 4
cores."

If I'm reading that correctly, you have 2TB of data and 1.2TB of memory in
the cluster. I think that's a fundamental problem up front. If it's skewed
then that will be even worse for doing aggregation. I think to start the
data either needs to be broken down or the cluster upgraded unfortunately.

On Wed, Sep 9, 2015 at 5:41 PM, Richard Marscher 
wrote:

> Do you have any details about the cluster you are running this against?
> The memory per executor/node, number of executors, and such? Even at a
> shuffle setting of 1000 that would be roughly 1GB per partition assuming
> the 1TB of data includes overheads in the JVM. Maybe try another order of
> magnitude higher for number of shuffle partitions and see where that gets
> you?
>
> On Tue, Sep 1, 2015 at 12:11 PM, unk1102  wrote:
>
>> Hi I am using Spark SQL actually hiveContext.sql() which uses group by
>> queries and I am running into OOM issues. So thinking of increasing value
>> of
>> spark.sql.shuffle.partition from 200 default to 1000 but it is not
>> helping.
>> Please correct me if I am wrong this partitions will share data shuffle
>> load
>> so more the partitions less data to hold. Please guide I am new to Spark.
>> I
>> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process
>> using hiveContext.sql() group by queries.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?

2015-09-09 Thread Kristina Rogale Plazonic
Thanks so much! I was just looking for it today on spark-packages - you've
read my mind :)

On Wed, Sep 9, 2015 at 5:53 PM, Burak Yavuz  wrote:

> By the way, I published
> http://spark-packages.org/package/brkyvz/lazy-linalg that contains many
> of the arithmetic operations for use in Scala. I really would appreciate
> any feedback!
>
> On Tue, Aug 25, 2015 at 11:06 AM, Kristina Rogale Plazonic <
> kpl...@gmail.com> wrote:
>
>> YES PLEASE!
>>
>> :)))
>>
>> On Tue, Aug 25, 2015 at 1:57 PM, Burak Yavuz  wrote:
>>
>>> Hmm. I have a lot of code on the local linear algebra operations using
>>> Spark's Matrix and Vector representations
>>> done for https://issues.apache.org/jira/browse/SPARK-6442.
>>>
>>> I can make a Spark package with that code if people are interested.
>>>
>>> Best,
>>> Burak
>>>
>>> On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic <
>>> kpl...@gmail.com> wrote:
>>>
 However I do think it's easier than it seems to write the implicits;
> it doesn't involve new classes or anything. Yes it's pretty much just
> what you wrote. There is a class "Vector" in Spark. This declaration
> can be in an object; you don't implement your own class. (Also you can
> use "toBreeze" to get Breeze vectors.)


 The implicit conversion with the implicit def happens for the first
 vector in the sum, but not the second vector (see below).

 At this point I give up, because I spent way too much time.  I am so
 disappointed.  So many times I heard "Spark makes simple things easy and
 complicated things possible". Well, here is the simplest thing you can
 imagine in linear algebra, but heck, it is not easy or intuitive.  It was
 easier to run a DeepLearning algo (from another library) than add two
 vectors.

 If anybody has a workaround other than implementing your own
 add/substract/scalarMultiply, PLEASE let me know.

 Here is the code and error from (freshly started) spark-shell:

 scala> import breeze.linalg.{DenseVector => BDV, SparseVector => BSV,
 Vector => BV}
 import breeze.linalg.{DenseVector=>BDV, SparseVector=>BSV, Vector=>BV}

 scala> import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.linalg.Vectors

 scala> val v1 = Vectors.dense(1.0, 2.0, 3.0)
 v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]

 scala> import org.apache.spark.mllib.linalg.{Vector =>SparkVector}
 import org.apache.spark.mllib.linalg.{Vector=>SparkVector}

 scala> object MyUtils {
  |   implicit def toBreeze(v:SparkVector) = BV(v.toArray)
  | }
 warning: there were 1 feature warning(s); re-run with -feature for
 details
 defined module MyUtils

 scala> import MyUtils._
 import MyUtils._

 scala> v1:BV[Double]
 res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0)

 scala> v1 + v1
 :30: error: could not find implicit value for parameter op:
 breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That]
   v1 + v1
  ^



>>>
>>>
>>
>


Re: Partitions with zero records & variable task times

2015-09-09 Thread mark
The article is interesting but doesn't really help. It has only one
sentence about data distribution in partitions.

How can I diagnose skewed data distribution?

How could evenly sized blocks in HDFS lead to skewed data anyway?
On 9 Sep 2015 2:29 pm, "Akhil Das"  wrote:

> This post here has a bit information
> http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/
>
> Thanks
> Best Regards
>
> On Wed, Sep 9, 2015 at 6:44 AM, mark  wrote:
>
>> As I understand things (maybe naively), my input data are stored in equal
>> sized blocks in HDFS, and each block  represents a partition within Spark
>> when read from HDFS, therefore each block should hold roughly the same
>> number of records.
>>
>> So something is missing in my understanding - what can cause some
>> partitions to have zero records and others to have roughly equal sized
>> chunks (~50k in this case)?
>>
>> Before writing a custom partitioner, I would like to understand why has
>> the default partitioner failed in my case?
>> On 8 Sep 2015 3:00 pm, "Akhil Das"  wrote:
>>
>>> Try using a custom partitioner for the keys so that they will get evenly
>>> distributed across tasks
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Sep 4, 2015 at 7:19 PM, mark  wrote:
>>>
 I am trying to tune a Spark job and have noticed some strange behavior
 - tasks in a stage vary in execution time, ranging from 2 seconds to 20
 seconds. I assume tasks should all run in roughly the same amount of time
 in a well tuned job.

 So I did some investigation - the fast tasks appear to have no records,
 whilst the slow tasks do. I need help understanding why this is happening.

 The code in the stage is pretty simple. All it does is:

 - filters records
 - maps records to a (key, record) tuple
 - reduces by key

 The data are Avro objects stored in Parquet files in 16MB blocks in
 HDFS.

 To establish how many records in each partition I added this snippet:

 val counts = rdd.mapPartitions(iter => {
   val ctx = TaskContext.get
   val stageId = ctx.stageId
   val partId = ctx.partitionId
   val attemptid = ctx.taskAttemptId()
 Array(Array(stageId, partId, attemptid, iter.size)).iterator }
   , true).collect()

 Which produces the following:

 1  1  0  0
 1  2  1  50489
 1  3  2  0
 1  4  3  0
 1  5  4  0
 1  6  5  53200
 1  7  6  0
 1  8  7  0
 1  9  8  0
 1  10   9  56946
 1  11   10   0
 1  12   11   0
 1  13   12   0
 1  14   13   59209
 1  15   14   0
 1  16   15   0
 1  17   16   0
 1  18   17   50202
 1  19   18   0
 1  20   19   0
 1  21   20   0
 1  22   21   54613
 1  23   22   0
 1  24   23   0
 1  25   24   54157
 1  26   25   0
 1  27   26   0
 1  28   27   0
 1  29   28   53595
 1  30   29   0
 1  31   30   0
 1  32   31   10750


 Looking at the logs, you can see the tasks that contain records have
 the longest run time:

 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
 (TID 26) in 2782 ms on DG1322 (6/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0
 (TID 8) in 2815 ms on DG1322 (7/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
 (TID 20) in 2815 ms on DG1322 (8/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
 (TID 24) in 2840 ms on DG1321 (9/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
 (TID 30) in 2839 ms on DG1321 (10/32)
 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
 (TID 12) in 2878 ms on DG1321 (11/32)
 15/09/03 16:26:36 INFO 

Re: ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-09 Thread Ted Yu
Which release of Spark are you using ?

Can you show skeleton of your partitioner and comparator ?

Thanks



> On Sep 9, 2015, at 4:45 PM, Ashish Shenoy  wrote:
> 
> Hi,
> 
> I am trying to sort a RDD pair using repartitionAndSortWithinPartitions() for 
> my key [which is a custom class, not a java primitive] using a custom 
> partitioner on that key and a custom comparator. However, it fails 
> consistently:
> 
> org.apache.spark.SparkException: Job aborted due to stage failure: Task 18 in 
> stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage 1.0 
> (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78
> at 
> org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
> at 
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
> at 
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
> at 
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:70)
> at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
> 
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
> at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
> at scala.Option.foreach(Option.scala:236)
> at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
> at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
> at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
> 
> I also persist the RDD using the "memory and disk" storage level. The stack 
> trace above comes from spark's code and not my application code. Can you pls 
> point out what I am doing wrong ?
> 
> Thanks,
> Ashish


Re: spark.shuffle.spill=false ignored?

2015-09-09 Thread Eric Walker
Hi Richard,

I've stepped away from this issue since I raised my question.  An
additional detail that was unknown at the time was that not in every
instance when the spilling to disk was encountered did the application run
out of disk space; that problem appears to have been a one-off problem.
The main challenge was that the spark.shuffle.spill setting seemed to be
ignored.  This might have been the expected behavior given the skew that
was in the data.

More generally, attempts to tweak the application behavior using such
settings as spark.python.worker.memory and spark.shuffle.memoryFraction had
no observable effect.  It is possible that the ignoring of
the spark.shuffle.spill setting was just a manifestation of a larger issue
going back to a misconfiguration.

Eric


On Wed, Sep 9, 2015 at 4:48 PM, Richard Marscher 
wrote:

> Hi Eric,
>
> I just wanted to do a sanity check, do you know what paths it is trying to
> write to? I ask because even without spilling, shuffles always write to
> disk first before transferring data across the network. I had at one point
> encountered this myself where we accidentally had /tmp mounted on a tiny
> disk and kept running out of disk on shuffles even though we also don't
> spill. You may have already considered or ruled this out though.
>
> On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker 
> wrote:
>
>> Hi,
>>
>> I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to
>> run a large pyspark job several times, specifying
>> `spark.shuffle.spill=false` in different ways.  It seems that the setting
>> is ignored, at least partially, and some of the tasks start spilling large
>> amounts of data to disk.  The job has been fast enough in the past, but
>> once it starts spilling to disk it lands on Miller's planet [1].
>>
>> Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
>> could there be an incompatible setting that is overriding
>> `spark.shuffle.spill=false`?  Is it something that goes back to Spark
>> 1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
>> to continue on for a while, I've started to see Kryo stack traces in the
>> tasks that are spilling to disk.  The stack traces mention there not being
>> enough disk space, although a `df` shows plenty of space (perhaps after the
>> fact, when temporary files have been cleaned up).
>>
>> Has anyone run into something like this before?  I would be happy to see
>> OOM errors, because that would be consistent with one understanding of what
>> might be going on, but I haven't yet.
>>
>> Eric
>>
>>
>> [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active
>>
>
>
>
> --
> *Richard Marscher*
> Software Engineer
> Localytics
> Localytics.com  | Our Blog
>  | Twitter  |
> Facebook  | LinkedIn
> 
>


Re: Spark MLlib Decision Tree Node Accuracy

2015-09-09 Thread sethah
If you are able to traverse the tree, then you can extract the id of the leaf
node for each feature vector. This is like a modified predict method where
it returns the leaf node assigned to the data point instead of the
prediction for that leaf node. The following example code should work: 

import org.apache.spark.mllib.tree.model.Node
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.model.DecisionTreeModel
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.tree.configuration.FeatureType._
import org.apache.spark.mllib.linalg.Vector

// Load and parse the data file.
val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split the data into training and test sets (30% held out for testing)
val splits = data.randomSplit(Array(0.7, 0.3))
val (trainingData, testData) = (splits(0), splits(1))

// Train a DecisionTree model.
//  Empty categoricalFeaturesInfo indicates all features are continuous.
val numClasses = 2
val categoricalFeaturesInfo = Map[Int, Int]()
val impurity = "gini"
val maxDepth = 5
val maxBins = 32

val model = DecisionTree.trainClassifier(trainingData, numClasses,
categoricalFeaturesInfo,
  impurity, maxDepth, maxBins)

def predictImpl(node: Node, features: Vector): Node = {
  if (node.isLeaf) {
node
  } else {
if (node.split.get.featureType == Continuous) {
  if (features(node.split.get.feature) <= node.split.get.threshold) {
predictImpl(node.leftNode.get, features)
  } else {
predictImpl(node.rightNode.get, features)
  }
} else {
  if
(node.split.get.categories.contains(features(node.split.get.feature))) {
predictImpl(node.leftNode.get, features)
  } else {
predictImpl(node.rightNode.get, features)
  }
}
  }
}

val nodeIDAndPredsAndLabels = data.map { lp => 
  val node = predictImpl(model.topNode, lp.features)
  (node.id, (node.predict.predict, lp.label))
}

>From here, you should be able to perform analysis of the accuracy of each
leaf node.

Note that in the new Spark ML library a predictNodeIndex is implemented
(which is being converted to a predictImpl method) similar to the
implementation above. Hopefully that code helps.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-MLlib-Decision-Tree-Node-Accuracy-tp24561p24629.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Accumulator with non-java-serializable value ?

2015-09-09 Thread Thomas Dudziak
I want to use t-digest with foreachPartition and accumulators (essentially,
create a t-digest per partition and add that to the accumulator leveraging
the fact that t-digests can be added to each other). I can make t-digests
kryo-serializable easily but java-serializable is not very easy.
Now, when running it (1.4.1), I get this error:

Exception in thread "main" org.apache.spark.SparkException: Task not
serializable
at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:877)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:876)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:876)
at org.apache.spark.sql.DataFrame.foreachPartition(DataFrame.scala:1255)

which makes sense because the suggested way to use accumulators is via
closures. But since in my case I can't easily make the value type
java-serializable, that won't work. Is there another way to pass the
accumulator to the tasks that doesn't involve closures and hence java
serialization ?


Re: Does Spark.ml LogisticRegression assumes only Double valued features?

2015-09-09 Thread sethah
When you pass a data frame into the train method of LogisticRegression and
other ML learning algorithms, the data is extracted by using parameters
`labelCol` and `featuresCol` which should have been set before calling the
train method (they default to "label" and "features", respectively).
`featuresCol` should be a Vector type consisting of Doubles. When the train
method is called, it tries to verify that the data type of `featuresCol` is
type Vector and that the data type of `labelCol` is of type Double. It will
throw an exception if other data types are found.

Spark ML has special ways of handling features that are not inherently
continuous or numerical. I urge you to review this question on StackOverflow
which covers it quite well:

http://stackoverflow.com/questions/32277576/spark-ml-categorical-features



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Spark-ml-LogisticRegression-assumes-only-Double-valued-features-tp24575p24630.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



ArrayIndexOutOfBoundsException when using repartitionAndSortWithinPartitions()

2015-09-09 Thread Ashish Shenoy
Hi,

I am trying to sort a RDD pair using repartitionAndSortWithinPartitions()
for my key [which is a custom class, not a java primitive] using a custom
partitioner on that key and a custom comparator. However, it fails
consistently:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 18
in stage 1.0 failed 4 times, most recent failure: Lost task 18.3 in stage
1.0 (TID 202, 172.16.18.25): java.lang.ArrayIndexOutOfBoundsException: -78
at
org.apache.spark.util.collection.ExternalSorter.spillToPartitionFiles(ExternalSorter.scala:375)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:208)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1273)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1264)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1263)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1263)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:730)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:730)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1457)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1418)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

I also persist the RDD using the "memory and disk" storage level. The stack
trace above comes from spark's code and not my application code. Can you
pls point out what I am doing wrong ?

Thanks,
Ashish


RE: Spark ANN

2015-09-09 Thread Ulanov, Alexander
Thank you, Feynman, this is helpful. The paper that I linked claims a big 
speedup with FFTs for large convolution size. Though as you mentioned there is 
no FFT transformer in Spark yet. Moreover, we would need a parallel version of 
FFTs to support batch computations. So it probably worth considering 
matrix-matrix multiplication for convolution optimization at least as a first 
version. It can also take advantage of data batches.

From: Feynman Liang [mailto:fli...@databricks.com]
Sent: Wednesday, September 09, 2015 12:56 AM
To: Ulanov, Alexander
Cc: Ruslan Dautkhanov; Nick Pentreath; user; na...@yandex.ru
Subject: Re: Spark ANN

My 2 cents:

* There is frequency domain processing available already (e.g. 
spark.ml DCT transformer) but no FFT transformer yet because 
complex numbers are not currently a Spark SQL datatype
* We shouldn't assume signals are even, so we need complex numbers to implement 
the FFT
* I have not closely studied the relative performance tradeoffs, so please do 
let me know if there's a significant difference in practice



On Tue, Sep 8, 2015 at 5:46 PM, Ulanov, Alexander 
> wrote:
That is an option too. Implementing convolutions with FFTs should be considered 
as well http://arxiv.org/pdf/1312.5851.pdf.

From: Feynman Liang [mailto:fli...@databricks.com]
Sent: Tuesday, September 08, 2015 12:07 PM
To: Ulanov, Alexander
Cc: Ruslan Dautkhanov; Nick Pentreath; user; 
na...@yandex.ru
Subject: Re: Spark ANN

Just wondering, why do we need tensors? Is the implementation of convnets using 
im2col (see here) insufficient?

On Tue, Sep 8, 2015 at 11:55 AM, Ulanov, Alexander 
> wrote:
Ruslan, thanks for including me in the discussion!

Dropout and other features such as Autoencoder were implemented, but not merged 
yet in order to have room for improving the internal Layer API. For example, 
there is an ongoing work with convolutional layer that consumes/outputs 2D 
arrays. We’ll probably need to change the Layer’s input/output type to tensors. 
This will influence dropout which will need some refactoring to handle tensors 
too. Also, all new components should have ML pipeline public interface. There 
is an umbrella issue for deep learning in Spark 
https://issues.apache.org/jira/browse/SPARK-5575 which includes various 
features of Autoencoder, in particular 
https://issues.apache.org/jira/browse/SPARK-10408. You are very welcome to join 
and contribute since there is a lot of work to be done.

Best regards, Alexander
From: Ruslan Dautkhanov 
[mailto:dautkha...@gmail.com]
Sent: Monday, September 07, 2015 10:09 PM
To: Feynman Liang
Cc: Nick Pentreath; user; na...@yandex.ru
Subject: Re: Spark ANN

Found a dropout commit from avulanov:
https://github.com/avulanov/spark/commit/3f25e26d10ef8617e46e35953fe0ad1a178be69d

It probably hasn't made its way to MLLib (yet?).



--
Ruslan Dautkhanov

On Mon, Sep 7, 2015 at 8:34 PM, Feynman Liang 
> wrote:
Unfortunately, not yet... Deep learning support (autoencoders, RBMs) is on the 
roadmap for 1.6 though, and 
there is a spark 
package for 
dropout regularized logistic regression.


On Mon, Sep 7, 2015 at 3:15 PM, Ruslan Dautkhanov 
> wrote:
Thanks!

It does not look Spark ANN yet supports dropout/dropconnect or any other 
techniques that help avoiding overfitting?
http://www.cs.toronto.edu/~rsalakhu/papers/srivastava14a.pdf
https://cs.nyu.edu/~wanli/dropc/dropc.pdf

ps. There is a small copy-paste typo in
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/ann/BreezeUtil.scala#L43
should read B :)



--
Ruslan Dautkhanov

On Mon, Sep 7, 2015 at 12:47 PM, Feynman Liang 
> wrote:
Backprop is used to compute the gradient 
here,
 which is then optimized by SGD or LBFGS 
here

On Mon, Sep 7, 2015 at 11:24 AM, Nick Pentreath 
> wrote:
Haven't checked the actual code but that doc says "MLPC employes 
backpropagation for learning the model. .."?



—
Sent from Mailbox


On Mon, Sep 7, 2015 at 8:18 PM, Ruslan Dautkhanov 
> wrote:
http://people.apache.org/~pwendell/spark-releases/latest/ml-ann.html

Implementation seems 

Re: Can not allocate executor when running spark on mesos

2015-09-09 Thread canan chen
Finally got the answer.  Actually it works fine. The allocation behavior on
mesos is a little different from yarn/standalone. Seems the executor in
mesos is lazily allocated (only when job is executed) while executor in
yarn/standalone is allocated when spark-shell is started.



On Tue, Sep 8, 2015 at 10:39 PM, canan chen  wrote:

> Yes, I follow the guide in this doc, and run it as mesos client mode
>
> On Tue, Sep 8, 2015 at 6:31 PM, Akhil Das 
> wrote:
>
>> In which mode are you submitting your application? (coarse-grained or
>> fine-grained(default)). Have you gone through this documentation already?
>> http://spark.apache.org/docs/latest/running-on-mesos.html#using-a-mesos-master-url
>>
>> Thanks
>> Best Regards
>>
>> On Tue, Sep 8, 2015 at 12:54 PM, canan chen  wrote:
>>
>>> Hi all,
>>>
>>> I try to run spark on mesos, but it looks like I can not allocate
>>> resources from mesos. I am not expert of mesos, but from the mesos log, it
>>> seems spark always decline the offer from mesos. Not sure what's wrong,
>>> maybe need some configuration change. Here's the mesos master log
>>>
>>> I0908 15:08:16.515960 301916160 master.cpp:1767] Received registration
>>> request for framework 'Spark shell' at
>>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>>> I0908 15:08:16.520545 301916160 master.cpp:1834] Registering framework
>>> 20150908-143320-16777343-5050-41965- (Spark shell) at
>>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133 with
>>> checkpointing disabled and capabilities [  ]
>>> I0908 15:08:16.522307 300843008 hierarchical.hpp:386] Added framework
>>> 20150908-143320-16777343-5050-41965-
>>> I0908 15:08:16.525845 301379584 master.cpp:4290] Sending 1 offers to
>>> framework 20150908-143320-16777343-5050-41965- (Spark shell) at
>>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>>> I0908 15:08:16.637677 302452736 master.cpp:2884] Processing DECLINE call
>>> for offers: [ 20150908-143320-16777343-5050-41965-O0 ] for framework
>>> 20150908-143320-16777343-5050-41965- (Spark shell) at
>>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>>> I0908 15:08:16.639197 299233280 hierarchical.hpp:761] Recovered
>>> cpus(*):8; mem(*):15360; disk(*):470842; ports(*):[31000-32000] (total:
>>> cpus(*):8; mem(*):15360; disk(*):470842; ports(*):[31000-32000], allocated:
>>> ) on slave 20150908-143320-16777343-5050-41965-S0 from framework
>>> 20150908-143320-16777343-5050-41965-
>>> I0908 15:08:21.786932 300306432 master.cpp:4290] Sending 1 offers to
>>> framework 20150908-143320-16777343-5050-41965- (Spark shell) at
>>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>>> I0908 15:08:21.789979 298696704 master.cpp:2884] Processing DECLINE call
>>> for offers: [ 20150908-143320-16777343-5050-41965-O1 ] for framework
>>> 20150908-143320-16777343-5050-41965- (Spark shell) at
>>> scheduler-1ea1c85b-68bd-40b4-8c7c-ddccfd56f82b@192.168.3.3:57133
>>>
>>
>>
>


Re: Is HDFS required for Spark streaming?

2015-09-09 Thread N B
Thanks Cody and TD.

If we do run with local directories, I suppose the checkpoint operation
will write the various partitions of an RDD into their own local dirs (of
course). So what's the worst that can happen in case of a node failure?
Will the streaming batches continue to process (i.e. does the lost
checkpointed data get recovered or recreated?) or will the entire
Application start seeing Errors from that point onwards?

Thanks
Nikunj


On Tue, Sep 8, 2015 at 11:54 AM, Tathagata Das  wrote:

> You can use local directories in that case but it is not recommended and
> not a well-test code path (so I have no idea what can happen).
>
> On Tue, Sep 8, 2015 at 6:59 AM, Cody Koeninger  wrote:
>
>> Yes, local directories will be sufficient
>>
>> On Sat, Sep 5, 2015 at 10:44 AM, N B  wrote:
>>
>>> Hi TD,
>>>
>>> Thanks!
>>>
>>> So our application does turn on checkpoints but we do not recover upon
>>> application restart (we just blow the checkpoint directory away first and
>>> re-create the StreamingContext) as we don't have a real need for that type
>>> of recovery. However, because the application does reduceeByKeyAndWindow
>>> operations, checkpointing has to be turned on. Do you think this scenario
>>> will also only work with HDFS or having local directories suffice?
>>>
>>> Thanks
>>> Nikunj
>>>
>>>
>>>
>>> On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das 
>>> wrote:
>>>
 Shuffle spills will use local disk, HDFS not needed.
 Spark and Spark Streaming checkpoint info WILL NEED HDFS for
 fault-tolerance. So that stuff can be recovered even if the spark cluster
 nodes go down.

 TD

 On Fri, Sep 4, 2015 at 2:45 PM, N B  wrote:

> Hello,
>
> We have a Spark Streaming program that is currently running on a
> single node in "local[n]" master mode. We currently give it local
> directories for Spark's own state management etc. The input is streaming
> from network/flume and output is also to network/kafka etc, so the process
> as such does not need any distributed file system.
>
> Now, we do want to start distributing this procesing across a few
> machines and make a real cluster out of it. However, I am not sure if HDFS
> is a hard requirement for that to happen. I am thinking about the Shuffle
> spills, DStream/RDD persistence and checkpoint info. Do any of these
> require the state to be shared via HDFS? Are there other alternatives that
> can be utilized if state sharing is accomplished via the file system only.
>
> Thanks
> Nikunj
>
>

>>>
>>
>


Tungsten and Spark Streaming

2015-09-09 Thread N B
Hello,

How can we start taking advantage of the performance gains made under
Project Tungsten in Spark 1.5 for a Spark Streaming program?

>From what I understand, this is available by default for Dataframes. But
for a program written using Spark Streaming, would we see any potential
gains "out of the box" in 1.5 or will we have to rewrite some portions of
the application code to realize that benefit?

Any insight/documentation links etc in this regard will be appreciated.

Thanks
Nikunj


Creating Parquet external table using HiveContext API

2015-09-09 Thread Mohammad Islam
Hi,I want to create  an external hive table using HiveContext. I have the 
following :1. full path/location of parquet data directory2. name of the new 
table3. I can get the schema as well.
What API will be the best (for 1,3.x or 1.4.x)? I can see 6 
createExternalTable() APIs but not sure which one will be the best.I didn't 
find any good documentation in source code or Java doc about the parameters of 
the APIs (i.e path, source, options etc); Any help will be appreciated.

Regards,Mohammad


Re: build on spark 1.5.0 error with Execution scala-compile-first of goal & Compile failed via zinc server

2015-09-09 Thread Ted Yu
I used your first command with mvn 3.3.3 (without build/)

The build passed.

FYI

On Wed, Sep 9, 2015 at 8:50 PM, stark_summer  wrote:

> codeurl: http://d3kbcqa49mib13.cloudfront.net/spark-1.5.0.tgz
> build scripts:
>
> build/mvn  -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.1.0 -Pyarn -Phive
> -Dhive-version=0.12.0-cdh5.1.0 -Phive-thriftserver -Pspark-ganglia-lgpl
> -DskipTests clean package
>
> or
>
> ./make-distribution.sh --name 2.3.0 --tgz  -Phadoop-2.3
> -Dhadoop.version=2.3.0-cdh5.1.0 -Pyarn -Phive
> -Dhive-version=0.12.0-cdh5.1.0
> -Phive-thriftserver -Pspark-ganglia-lgpl  -DskipTests
>
> the two build scripts  have same error~
> error info:
> [ERROR] Failed to execute goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first)
> on
> project spark-launcher_2.10: Execution scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed
> ->
> [Help 1]
> org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
> goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
> (scala-compile-first) on project spark-launcher_2.10: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
>
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
> at
>
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
> at
>
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
> at
>
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
> at
>
> org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
> at
>
> org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
> at
>
> org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
> at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
> at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
> at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
> at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216)
> at org.apache.maven.cli.MavenCli.main(MavenCli.java:160)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
> at
>
> org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
> at
> org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
> Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
> scala-compile-first of goal
> net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
> at
>
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:143)
> at
>
> org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
> ... 19 more
> Caused by: Compile failed via zinc server
> at
> sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136)
> at
> sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86)
> at
>
> scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303)
> at
> scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
> at
> scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
> at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
> at
>
> org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
> ... 20 more
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/build-on-spark-1-5-0-error-with-Execution-scala-compile-first-of-goal-Compile-failed-via-zinc-server-tp24632.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Failed when starting Spark 1.5.0 standalone cluster

2015-09-09 Thread Ted Yu
See the following announcement:

http://search-hadoop.com/m/q3RTtojAyW1dabFk

On Wed, Sep 9, 2015 at 9:05 PM, Netwaver  wrote:

> Hi Spark experts,
>  I am trying to migrate my Spark cluster from
> 1.4.1 to latest 1.5.0 , but meet below issues when run start-all.sh script.
>
>   *Exception in thread "main"
> java.lang.NoClassDefFoundError: org/apache/spark/launcher/Main*
> *Caused by: java.lang.ClassNotFoundException:
> org.apache.spark.launcher.Main*
> *at java.net.URLClassLoader$1.run(Unknown Source)*
> *at java.security.AccessController.doPrivileged(Native Method)*
> *at java.net.URLClassLoader.findClass(Unknown Source)*
> *at java.lang.ClassLoader.loadClass(Unknown Source)*
> *at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)*
> *at java.lang.ClassLoader.loadClass(Unknown Source)*
> *Could not find the main class: org.apache.spark.launcher.Main.  Program
> will exit.*
>
> I could easily migrate Spark cluster from 1.3.1 to
> 1.4.1 on the same machines before, I am wondering if Spark 1.5.0 asks for
> some special jars in the classpath?
>I am using JDK 1.6 , don't know if 1.6 is also supported by Spark
> 1.5.0. Any suggestion will be highly appreciated, thank you all.
>
>
>
>


spark streaming 1.3 with kafka connection timeout

2015-09-09 Thread Shushant Arora
Executors in spark streaming 1.3 fetch messages from kafka in batches and
what happens when executor takes longer time to complete a fetch batch

say in


directKafkaStream.foreachRDD(new Function, Void>() {

@Override
public Void call(JavaRDD v1) throws Exception {
v1.foreachPartition(new  VoidFunction>{
@Override
public void call(Iterator t) throws Exception {
//long running task
}});}});

Will this long running task drops the connectio of executor with kafka
brokers-
And how to handle that. I am getting Connection tmeout in my code.


RE: Driver OOM after upgrading to 1.5

2015-09-09 Thread Cheng, Hao
Will that be helpful if adding jvm options like:
-XX:+CMSClassUnloadingEnabled -XX:+CMSPermGenSweepingEnabled

From: Reynold Xin [mailto:r...@databricks.com]
Sent: Thursday, September 10, 2015 5:31 AM
To: Sandy Ryza
Cc: user@spark.apache.org
Subject: Re: Driver OOM after upgrading to 1.5

It's likely that with codegen, you need a bigger permgen space. Also I found 
that Java 7 doesn't do very well w.r.t. flushing code cache. As a result, Spark 
SQL and DataFrames now run much better under Java 8, because it flushes code 
cache better.


On Wed, Sep 9, 2015 at 2:12 PM, Sandy Ryza 
> wrote:
Java 7.

FWIW I was just able to get it to work by increasing MaxPermSize to 256m.

-Sandy

On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin 
> wrote:
Java 7 / 8?

On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza 
> wrote:
I just upgraded the 
spark-timeseries project to run 
on top of 1.5, and I'm noticing that tests are failing with OOMEs.

I ran a jmap -histo on the process and discovered the top heap items to be:
   1:163428   22236064  
   2:163428   21112648  
   3: 12638   14459192  
   4: 12638   13455904  
   5: 105397642528  

Not sure whether this is suspicious.  Any ideas?

-Sandy





Filtering an rdd depending upon a list of values in Spark

2015-09-09 Thread prachicsa


I want to apply filter based on a list of values in Spark. This is how I get
the list:

DataFrame df = sqlContext.read().json("../sample.json");

df.groupBy("token").count().show();

Tokens = df.select("token").collect();
for(int i = 0; i < Tokens.length; i++){
System.out.println(Tokens[i].get(0)); // Need to apply filter
for Token[i].get(0)
}

Rdd on which I want apply filter is this:

JavaRDD file = context.textFile(args[0]);

I figured out a way to filter in java:

private static final Function Filter =
new Function() {
@Override
public Boolean call(String s) {
return s.contains("Set");
}
};

How do I go about it?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Filtering-an-rdd-depending-upon-a-list-of-values-in-Spark-tp24631.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Filtering an rdd depending upon a list of values in Spark

2015-09-09 Thread Ted Yu
Take a look at the following methods:

   * Filters rows using the given condition.
   * {{{
   *   // The following are equivalent:
   *   peopleDf.filter($"age" > 15)
   *   peopleDf.where($"age" > 15)
   * }}}
   * @group dfops
   * @since 1.3.0
   */
  def filter(condition: Column): DataFrame = Filter(condition.expr,
logicalPlan)

  * Filters rows using the given SQL expression.
   * {{{
   *   peopleDf.filter("age > 15")
   * }}}
   * @group dfops
   * @since 1.3.0
   */
  def filter(conditionExpr: String): DataFrame = {

Cheers

On Wed, Sep 9, 2015 at 8:04 PM, prachicsa  wrote:

>
>
> I want to apply filter based on a list of values in Spark. This is how I
> get
> the list:
>
> DataFrame df = sqlContext.read().json("../sample.json");
>
> df.groupBy("token").count().show();
>
> Tokens = df.select("token").collect();
> for(int i = 0; i < Tokens.length; i++){
> System.out.println(Tokens[i].get(0)); // Need to apply filter
> for Token[i].get(0)
> }
>
> Rdd on which I want apply filter is this:
>
> JavaRDD file = context.textFile(args[0]);
>
> I figured out a way to filter in java:
>
> private static final Function Filter =
> new Function() {
> @Override
> public Boolean call(String s) {
> return s.contains("Set");
> }
> };
>
> How do I go about it?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Filtering-an-rdd-depending-upon-a-list-of-values-in-Spark-tp24631.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


build on spark 1.5.0 error with Execution scala-compile-first of goal & Compile failed via zinc server

2015-09-09 Thread stark_summer
codeurl: http://d3kbcqa49mib13.cloudfront.net/spark-1.5.0.tgz
build scripts:

build/mvn  -Phadoop-2.3 -Dhadoop.version=2.3.0-cdh5.1.0 -Pyarn -Phive
-Dhive-version=0.12.0-cdh5.1.0 -Phive-thriftserver -Pspark-ganglia-lgpl 
-DskipTests clean package

or 

./make-distribution.sh --name 2.3.0 --tgz  -Phadoop-2.3
-Dhadoop.version=2.3.0-cdh5.1.0 -Pyarn -Phive -Dhive-version=0.12.0-cdh5.1.0
-Phive-thriftserver -Pspark-ganglia-lgpl  -DskipTests

the two build scripts  have same error~
error info:
[ERROR] Failed to execute goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile (scala-compile-first) on
project spark-launcher_2.10: Execution scala-compile-first of goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed. CompileFailed ->
[Help 1]
org.apache.maven.lifecycle.LifecycleExecutionException: Failed to execute
goal net.alchim31.maven:scala-maven-plugin:3.2.2:compile
(scala-compile-first) on project spark-launcher_2.10: Execution
scala-compile-first of goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:224)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:153)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:145)
at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:116)
at
org.apache.maven.lifecycle.internal.LifecycleModuleBuilder.buildProject(LifecycleModuleBuilder.java:80)
at
org.apache.maven.lifecycle.internal.builder.singlethreaded.SingleThreadedBuilder.build(SingleThreadedBuilder.java:51)
at
org.apache.maven.lifecycle.internal.LifecycleStarter.execute(LifecycleStarter.java:120)
at org.apache.maven.DefaultMaven.doExecute(DefaultMaven.java:355)
at org.apache.maven.DefaultMaven.execute(DefaultMaven.java:155)
at org.apache.maven.cli.MavenCli.execute(MavenCli.java:584)
at org.apache.maven.cli.MavenCli.doMain(MavenCli.java:216)
at org.apache.maven.cli.MavenCli.main(MavenCli.java:160)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.codehaus.plexus.classworlds.launcher.Launcher.launchEnhanced(Launcher.java:289)
at
org.codehaus.plexus.classworlds.launcher.Launcher.launch(Launcher.java:229)
at
org.codehaus.plexus.classworlds.launcher.Launcher.mainWithExitCode(Launcher.java:415)
at
org.codehaus.plexus.classworlds.launcher.Launcher.main(Launcher.java:356)
Caused by: org.apache.maven.plugin.PluginExecutionException: Execution
scala-compile-first of goal
net.alchim31.maven:scala-maven-plugin:3.2.2:compile failed.
at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:143)
at
org.apache.maven.lifecycle.internal.MojoExecutor.execute(MojoExecutor.java:208)
... 19 more
Caused by: Compile failed via zinc server
at
sbt_inc.SbtIncrementalCompiler.zincCompile(SbtIncrementalCompiler.java:136)
at
sbt_inc.SbtIncrementalCompiler.compile(SbtIncrementalCompiler.java:86)
at
scala_maven.ScalaCompilerSupport.incrementalCompile(ScalaCompilerSupport.java:303)
at
scala_maven.ScalaCompilerSupport.compile(ScalaCompilerSupport.java:119)
at
scala_maven.ScalaCompilerSupport.doExecute(ScalaCompilerSupport.java:99)
at scala_maven.ScalaMojoSupport.execute(ScalaMojoSupport.java:482)
at
org.apache.maven.plugin.DefaultBuildPluginManager.executeMojo(DefaultBuildPluginManager.java:132)
... 20 more





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/build-on-spark-1-5-0-error-with-Execution-scala-compile-first-of-goal-Compile-failed-via-zinc-server-tp24632.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Failed when starting Spark 1.5.0 standalone cluster

2015-09-09 Thread Netwaver
Hi Spark experts,
 I am trying to migrate my Spark cluster from 1.4.1 to 
latest 1.5.0 , but meet below issues when run start-all.sh script.

  Exception in thread "main" 
java.lang.NoClassDefFoundError: org/apache/spark/launcher/Main
Caused by: java.lang.ClassNotFoundException: org.apache.spark.launcher.Main
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
Could not find the main class: org.apache.spark.launcher.Main.  Program will 
exit.

I could easily migrate Spark cluster from 1.3.1 to 
1.4.1 on the same machines before, I am wondering if Spark 1.5.0 asks for some 
special jars in the classpath?
   I am using JDK 1.6 , don't know if 1.6 is also supported by Spark 1.5.0. 
Any suggestion will be highly appreciated, thank you all.


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Tathagata Das
The whole point of checkpointing is to recover the *exact* computation
where it left of.
If you want any change in the specification of the computation (which
includes any intervals), then you cannot recover from checkpoint as it can
be an arbitrarily complex issue to deal with changes in the specs,
especially because a lot of specs are tied to each other (e.g. checkpoint
interval dictates other things like clean up intervals, etc.)

Why do you need to change the checkpointing interval at the time of
recovery? Trying to understand your usecase.


On Wed, Sep 9, 2015 at 12:03 PM, Dmitry Goldenberg  wrote:

> >> when you use getOrCreate, and there exists a valid checkpoint, it will
> always return the context from the checkpoint and not call the factory.
> Simple way to see whats going on is to print something in the factory to
> verify whether it is ever called.
>
> This is probably OK. Seems to explain why we were getting a sticky batch
> duration millis value. Once I blew away all the checkpointing directories
> and unplugged the data checkpointing (while keeping the metadata
> checkpointing) the batch duration millis was no longer sticky.
>
> So, there doesn't seem to be a way for metadata checkpointing to override
> its checkpoint duration millis, is there?  Is the default there
> max(batchdurationmillis, 10seconds)?  Is there a way to override this?
> Thanks.
>
>
>
>
>
> On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das  wrote:
>
>>
>>
>> See inline.
>>
>> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
>> dgoldenberg...@gmail.com> wrote:
>>
>>> What's wrong with creating a checkpointed context??  We WANT
>>> checkpointing, first of all.  We therefore WANT the checkpointed context.
>>>
>>> Second of all, it's not true that we're loading the checkpointed context
>>> independent of whether params.isCheckpointed() is true.  I'm quoting the
>>> code again:
>>>
>>> // This is NOT loading a checkpointed context if isCheckpointed() is
>>> false.
>>> JavaStreamingContext jssc = params.isCheckpointed() ?
>>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>>> params);
>>>
>>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>>> sparkConf, Parameters params) {
>>> JavaStreamingContextFactory factory = new
>>> JavaStreamingContextFactory() {
>>>   @Override
>>>   public JavaStreamingContext create() {
>>> return createContext(sparkConf, params);
>>>   }
>>> };
>>> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>>> factory);*
>>>
>> ^   when you use getOrCreate, and there exists a valid checkpoint, it
>> will always return the context from the checkpoint and not call the
>> factory. Simple way to see whats going on is to print something in the
>> factory to verify whether it is ever called.
>>
>>
>>
>>
>>
>>>   }
>>>
>>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>>> Parameters params) {
>>> // Create context with the specified batch interval, in
>>> milliseconds.
>>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>>> Durations.milliseconds(params.getBatchDurationMillis()));
>>> // Set the checkpoint directory, if we're checkpointing
>>> if (params.isCheckpointed()) {
>>>   jssc.checkpoint(params.getCheckpointDir());
>>>
>>> }
>>> ...
>>> Again, this is *only* calling context.checkpoint() if isCheckpointed()
>>> is true.  And we WANT it to be true.
>>>
>>> What am I missing here?
>>>
>>>
>>>
>


Re:Re: Failed when starting Spark 1.5.0 standalone cluster

2015-09-09 Thread Netwaver
Thank you, Ted, This does help.
One more question, If I just want to migrate JDK only for Spark on my cluster 
machines, where can I add the JAVA_HOME environment variable? Does 
conf/spark-env.sh support JAVA_HOME environment variable? Thanks a lot.







在 2015-09-10 12:45:43,"Ted Yu"  写道:

See the following announcement:


http://search-hadoop.com/m/q3RTtojAyW1dabFk



On Wed, Sep 9, 2015 at 9:05 PM, Netwaver  wrote:

Hi Spark experts,
 I am trying to migrate my Spark cluster from 1.4.1 to 
latest 1.5.0 , but meet below issues when run start-all.sh script.

  Exception in thread "main" 
java.lang.NoClassDefFoundError: org/apache/spark/launcher/Main
Caused by: java.lang.ClassNotFoundException: org.apache.spark.launcher.Main
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
Could not find the main class: org.apache.spark.launcher.Main.  Program will 
exit.

I could easily migrate Spark cluster from 1.3.1 to 
1.4.1 on the same machines before, I am wondering if Spark 1.5.0 asks for some 
special jars in the classpath?
   I am using JDK 1.6 , don't know if 1.6 is also supported by Spark 1.5.0. 
Any suggestion will be highly appreciated, thank you all.





 




RE: Spark streaming -> cassandra : Fault Tolerance

2015-09-09 Thread Samya MAITI
Hi Cody,

Thanks for your reply.

Is there a way in Spark-Kafka-Direct API, so that if an exception to write to 
Cassandra occurs, we stop updating the checkpoint ?

In this way, there will be no message lost, once cassandra comes up, we can 
start reading from the point we left off.

Regards,
Sam

From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Thursday, September 10, 2015 1:13 AM
To: Samya MAITI 
Cc: user@spark.apache.org
Subject: Re: Spark streaming -> cassandra : Fault Tolerance

It's been a while since I've looked at the cassandra connector, so I can't give 
you specific advice on it.

But in general, if a spark task fails (uncaught exception), it will be retried 
automatically.  In the case of the kafka direct stream rdd, it will have 
exactly the same messages as the first attempt (as long as they're still in the 
kafka log).

If you or the cassandra connector are catching the exception, the task won't be 
retried automatically and it's up to you to deal with it.



On Wed, Sep 9, 2015 at 2:09 PM, Samya 
> wrote:
Hi Team,

I have an sample spark application which reads from Kafka using direct API &
then does some transformation & stores to cassandra (using
saveToCassandra()).

If Cassandra goes down, then application logs NoHostAvailable exception (as
expected). But in the mean time the new incoming messages are lost, as the
Direct API creates new checkpoint & deletes the previous one's.

Does that mean, I should handle the exception at application side?

Or is there any other hook to handle the same?

Thanks in advance.

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-cassandra-Fault-Tolerance-tp24625.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: 
user-unsubscr...@spark.apache.org
For additional commands, e-mail: 
user-h...@spark.apache.org



Re: Tungsten and Spark Streaming

2015-09-09 Thread Tathagata Das
Rewriting is necessary. You will have to convert RDD/DStream operations to
DataFrame operations. So get the RDDs in DStream, using
transform/foreachRDD, convert to DataFrames and then do DataFrame
operations.

On Wed, Sep 9, 2015 at 9:23 PM, N B  wrote:

> Hello,
>
> How can we start taking advantage of the performance gains made under
> Project Tungsten in Spark 1.5 for a Spark Streaming program?
>
> From what I understand, this is available by default for Dataframes. But
> for a program written using Spark Streaming, would we see any potential
> gains "out of the box" in 1.5 or will we have to rewrite some portions of
> the application code to realize that benefit?
>
> Any insight/documentation links etc in this regard will be appreciated.
>
> Thanks
> Nikunj
>
>


Re: Is HDFS required for Spark streaming?

2015-09-09 Thread Tathagata Das
Actually, i think it wont work. If you are using some operation that
requires RDD checkpointing, then if the checkpoint files cannot be read
(because executor failed), any subsequent operations that needs that state
data cannot continue. So all subsequent batches will fail.

You could reduce the chances of the state data being lost by replicating
the state RDDs. Set the state DStream persistence level to
StorageLevel.MEMORY_ONLY_SER_2

On Wed, Sep 9, 2015 at 9:18 PM, N B  wrote:

> Thanks Cody and TD.
>
> If we do run with local directories, I suppose the checkpoint operation
> will write the various partitions of an RDD into their own local dirs (of
> course). So what's the worst that can happen in case of a node failure?
> Will the streaming batches continue to process (i.e. does the lost
> checkpointed data get recovered or recreated?) or will the entire
> Application start seeing Errors from that point onwards?
>
> Thanks
> Nikunj
>
>
> On Tue, Sep 8, 2015 at 11:54 AM, Tathagata Das 
> wrote:
>
>> You can use local directories in that case but it is not recommended and
>> not a well-test code path (so I have no idea what can happen).
>>
>> On Tue, Sep 8, 2015 at 6:59 AM, Cody Koeninger 
>> wrote:
>>
>>> Yes, local directories will be sufficient
>>>
>>> On Sat, Sep 5, 2015 at 10:44 AM, N B  wrote:
>>>
 Hi TD,

 Thanks!

 So our application does turn on checkpoints but we do not recover upon
 application restart (we just blow the checkpoint directory away first and
 re-create the StreamingContext) as we don't have a real need for that type
 of recovery. However, because the application does reduceeByKeyAndWindow
 operations, checkpointing has to be turned on. Do you think this scenario
 will also only work with HDFS or having local directories suffice?

 Thanks
 Nikunj



 On Fri, Sep 4, 2015 at 3:09 PM, Tathagata Das 
 wrote:

> Shuffle spills will use local disk, HDFS not needed.
> Spark and Spark Streaming checkpoint info WILL NEED HDFS for
> fault-tolerance. So that stuff can be recovered even if the spark cluster
> nodes go down.
>
> TD
>
> On Fri, Sep 4, 2015 at 2:45 PM, N B  wrote:
>
>> Hello,
>>
>> We have a Spark Streaming program that is currently running on a
>> single node in "local[n]" master mode. We currently give it local
>> directories for Spark's own state management etc. The input is streaming
>> from network/flume and output is also to network/kafka etc, so the 
>> process
>> as such does not need any distributed file system.
>>
>> Now, we do want to start distributing this procesing across a few
>> machines and make a real cluster out of it. However, I am not sure if 
>> HDFS
>> is a hard requirement for that to happen. I am thinking about the Shuffle
>> spills, DStream/RDD persistence and checkpoint info. Do any of these
>> require the state to be shared via HDFS? Are there other alternatives 
>> that
>> can be utilized if state sharing is accomplished via the file system 
>> only.
>>
>> Thanks
>> Nikunj
>>
>>
>

>>>
>>
>


How to keep history of streaming statistics

2015-09-09 Thread b.bhavesh
Hello, 

How can I keep history of streaming statistics for completed applications.
Where in Spark, the information presented on UI is stored?

Thanks,
b.bhavesh



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-keep-history-of-streaming-statistics-tp24635.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



SparkR - Support for Other Models

2015-09-09 Thread Manish MAHESHWARI
Hello,

Is there a time line to add support for other model types like SVD, Cluster, 
GBM etc in the subsequent releases. 1.5 Added support for Linear models only.
If there is, where can we know the tentative timeline of the same.

Thanks,
Manish

CONFIDENTIAL NOTE:
The information contained in this email is intended only for the use of the 
individual or entity named above and may contain information that is 
privileged, confidential and exempt from disclosure under applicable law. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this message in error, please 
immediately notify the sender and delete the mail. Thank you.


Re: JNI issues with mesos

2015-09-09 Thread Adrian Bridgett

Thanks Tim,

There's a little more to it in fact - if I use the 
pre-built-with-hadoop-2.6 binaries, all is good (with correctly named 
tarballs in hdfs).   Using the pre-built with user-provided hadoop 
(including setting SPARK_DIST_CLASSPATH in setup-env.sh) then I get the 
JNI exception.


Aha - I've found the minimal set of changes that fixes it.  I can use 
the user-provided hadoop tarballs, but I _have_ to add spark-env.sh to 
them (which I wasn't expecting - I don't recall seeing this anywhere in 
the docs so I was expecting everything was setup by spark/mesos from the 
client config).


FWIW, spark-env.sh:
export SPARK_DIST_CLASSPATH=$(/opt/hadoop/bin/hadoop classpath)
#export MESOS_NATIVE_JAVA_LIBRARY=/usr/lib/libmesos.so
export SPARK_EXECUTOR_URI=hdfs:///apps/spark/spark15.tgz

Leaving out SPARK_DIST_CLASSPATH leads to 
org.apache.hadoop.fs.FSDataInputStream class errors (as you'd expect).
Leaving out MESOS_NATIVE_JAVA_LIBRARY seems to have no consequences ATM 
(it is set in the client).

Leaving out SPARK_EXECUTOR_URI stops the job starting at all.

spark-defaults.conf isn't required to be in the tarball, on the client 
it's set to:
spark.master 
mesos://zk://mesos-1.example.net:2181,mesos-2.example.net:2181,mesos-3.example.net:2181/mesos

spark.executor.uri hdfs:///apps/spark/spark15.tgz

I guess this is the way forward for us right now, bit uncomfortable as I 
like to understand why :-)


On 09/09/2015 18:43, Tim Chen wrote:

Hi Adrian,

Spark is expecting a specific naming of the tgz and also the folder 
name inside, as this is generated by running make-distribution.sh 
--tgz in the Spark source folder.


If you use a Spark 1.4 tgz generated with that script with the same 
name and upload to HDFS again, fix the URI then it should work.


Tim

On Wed, Sep 9, 2015 at 8:18 AM, Adrian Bridgett > wrote:


5mins later...

Trying 1.5 with a fairly plain build:
./make-distribution.sh --tgz --name os1 -Phadoop-2.6

and on my first attempt stderr showed:
I0909 15:16:49.392144  1619 fetcher.cpp:441] Fetched
'hdfs:///apps/spark/spark15.tgz' to

'/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S1/frameworks/20150826-133446-3217621258-5050-4064-211204/executors/20150826-133446-3217621258-5050-4064-S1/runs/43026ba8-6624-4817-912c-3d7573433102/spark15.tgz'
sh: 1: cd: can't cd to spark15.tgz
sh: 1: ./bin/spark-class: not found

Aha, let's rename the file in hdfs (and the two configs) from
spark15.tgz to spark-1.5.0-bin-os1.tgz...
Success!!!

The same trick with 1.4 doesn't work, but now that I have
something that does I can make progress.

Hopefully this helps someone else :-)

Adrian


On 09/09/2015 16:59, Adrian Bridgett wrote:

I'm trying to run spark (1.4.1) on top of mesos (0.23).  I've
followed the instructions (uploaded spark tarball to HDFS, set
executor uri in both places etc) and yet on the slaves it's
failing to lauch even the SparkPi example with a JNI error.  It
does run with a local master.  A day of debugging later and it's
time to ask for help!

 bin/spark-submit --master mesos://10.1.201.191:5050
 --class
org.apache.spark.examples.SparkPi /tmp/examples.jar

(I'm putting the jar outside hdfs  - on both client box + slave
(turned off other slaves for debugging) - due to

http://apache-spark-user-list.1001560.n3.nabble.com/Remote-jar-file-td20649.html.
I should note that I had the same JNI errors when using the mesos
cluster dispatcher).

I'm using Oracle Java 8 (no other java - even openjdk - is installed)

As you can see, the slave is downloading the framework fine (you
can even see it extracted on the slave).  Can anyone shed some
light on what's going on - e.g. how is it attempting to run the
executor?

I'm going to try a different JVM (and try a custom spark
distribution) but I suspect that the problem is much more basic.
Maybe it can't find the hadoop native libs?

Any light would be much appreciated :)  I've included the
slaves's stderr below:

I0909 14:14:01.405185 32132 logging.cpp:177] Logging to STDERR
I0909 14:14:01.405256 32132 fetcher.cpp:409] Fetcher Info:

{"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/20150826-133446-3217621258-5050-4064-S0\/ubuntu","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"hdfs:\/\/\/apps\/spark\/spark.tgz"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/20150826-133446-3217621258-5050-4064-S0\/frameworks\/20150826-133446-3217621258-5050-4064-211198\/executors\/20150826-133446-3217621258-5050-4064-S0\/runs\/38077da2-553e-4888-bfa3-ece2ab2119f3","user":"ubuntu"}
I0909 14:14:01.406332 32132 fetcher.cpp:364] Fetching URI
'hdfs:///apps/spark/spark.tgz'
I0909 14:14:01.406344 32132 fetcher.cpp:238] Fetching directly
into the sandbox directory
I0909 14:14:01.406358 

Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Dmitry Goldenberg
>> when you use getOrCreate, and there exists a valid checkpoint, it will
always return the context from the checkpoint and not call the factory.
Simple way to see whats going on is to print something in the factory to
verify whether it is ever called.

This is probably OK. Seems to explain why we were getting a sticky batch
duration millis value. Once I blew away all the checkpointing directories
and unplugged the data checkpointing (while keeping the metadata
checkpointing) the batch duration millis was no longer sticky.

So, there doesn't seem to be a way for metadata checkpointing to override
its checkpoint duration millis, is there?  Is the default there
max(batchdurationmillis, 10seconds)?  Is there a way to override this?
Thanks.





On Wed, Sep 9, 2015 at 2:44 PM, Tathagata Das  wrote:

>
>
> See inline.
>
> On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg <
> dgoldenberg...@gmail.com> wrote:
>
>> What's wrong with creating a checkpointed context??  We WANT
>> checkpointing, first of all.  We therefore WANT the checkpointed context.
>>
>> Second of all, it's not true that we're loading the checkpointed context
>> independent of whether params.isCheckpointed() is true.  I'm quoting the
>> code again:
>>
>> // This is NOT loading a checkpointed context if isCheckpointed() is
>> false.
>> JavaStreamingContext jssc = params.isCheckpointed() ?
>> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
>> params);
>>
>>   private JavaStreamingContext createCheckpointedContext(SparkConf
>> sparkConf, Parameters params) {
>> JavaStreamingContextFactory factory = new
>> JavaStreamingContextFactory() {
>>   @Override
>>   public JavaStreamingContext create() {
>> return createContext(sparkConf, params);
>>   }
>> };
>> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
>> factory);*
>>
> ^   when you use getOrCreate, and there exists a valid checkpoint, it
> will always return the context from the checkpoint and not call the
> factory. Simple way to see whats going on is to print something in the
> factory to verify whether it is ever called.
>
>
>
>
>
>>   }
>>
>>   private JavaStreamingContext createContext(SparkConf sparkConf,
>> Parameters params) {
>> // Create context with the specified batch interval, in milliseconds.
>> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
>> Durations.milliseconds(params.getBatchDurationMillis()));
>> // Set the checkpoint directory, if we're checkpointing
>> if (params.isCheckpointed()) {
>>   jssc.checkpoint(params.getCheckpointDir());
>>
>> }
>> ...
>> Again, this is *only* calling context.checkpoint() if isCheckpointed() is
>> true.  And we WANT it to be true.
>>
>> What am I missing here?
>>
>>
>>


Spark streaming -> cassandra : Fault Tolerance

2015-09-09 Thread Samya
Hi Team,

I have an sample spark application which reads from Kafka using direct API &
then does some transformation & stores to cassandra (using
saveToCassandra()).

If Cassandra goes down, then application logs NoHostAvailable exception (as
expected). But in the mean time the new incoming messages are lost, as the
Direct API creates new checkpoint & deletes the previous one's.

Does that mean, I should handle the exception at application side?

Or is there any other hook to handle the same?

Thanks in advance.

Regards,
Sam



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-cassandra-Fault-Tolerance-tp24625.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Reynold Xin
Java 7 / 8?

On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza  wrote:

> I just upgraded the spark-timeseries
>  project to run on top of
> 1.5, and I'm noticing that tests are failing with OOMEs.
>
> I ran a jmap -histo on the process and discovered the top heap items to be:
>1:163428   22236064  
>2:163428   21112648  
>3: 12638   14459192  
>4: 12638   13455904  
>5: 105397642528  
>
> Not sure whether this is suspicious.  Any ideas?
>
> -Sandy
>


Re: Spark Streaming checkpoints and code upgrade

2015-09-09 Thread Tathagata Das
Its pretty much impossible to do across arbitrary code changes. For that,
the best way is to go forward is the store and load the offsets yourselves.

On Wed, Sep 9, 2015 at 10:19 AM, Nicolas Monchy  wrote:

> Hello,
>
> I am using Spark Streaming and the Kafka Direct API and I am checkpointing
> the metadata.
> Checkpoints aren't recoverable if you upgrade code so I am losing the last
> consumed offsets in this case.
>
> I know I can build a system to store and load the offsets for each batch
> but before implementing that I would like to know if checkpoints are going
> to be able to recover a code upgrade in a foreseeable future ?
>
> Thanks,
> Nicolas
>


Re: Batchdurationmillis seems "sticky" with direct Spark streaming

2015-09-09 Thread Tathagata Das
See inline.

On Tue, Sep 8, 2015 at 9:02 PM, Dmitry Goldenberg 
wrote:

> What's wrong with creating a checkpointed context??  We WANT
> checkpointing, first of all.  We therefore WANT the checkpointed context.
>
> Second of all, it's not true that we're loading the checkpointed context
> independent of whether params.isCheckpointed() is true.  I'm quoting the
> code again:
>
> // This is NOT loading a checkpointed context if isCheckpointed() is false.
> JavaStreamingContext jssc = params.isCheckpointed() ?
> createCheckpointedContext(sparkConf, params) : createContext(sparkConf,
> params);
>
>   private JavaStreamingContext createCheckpointedContext(SparkConf
> sparkConf, Parameters params) {
> JavaStreamingContextFactory factory = new
> JavaStreamingContextFactory() {
>   @Override
>   public JavaStreamingContext create() {
> return createContext(sparkConf, params);
>   }
> };
> return *JavaStreamingContext.getOrCreate(params.getCheckpointDir(),
> factory);*
>
^   when you use getOrCreate, and there exists a valid checkpoint, it
will always return the context from the checkpoint and not call the
factory. Simple way to see whats going on is to print something in the
factory to verify whether it is ever called.





>   }
>
>   private JavaStreamingContext createContext(SparkConf sparkConf,
> Parameters params) {
> // Create context with the specified batch interval, in milliseconds.
> JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
> Durations.milliseconds(params.getBatchDurationMillis()));
> // Set the checkpoint directory, if we're checkpointing
> if (params.isCheckpointed()) {
>   jssc.checkpoint(params.getCheckpointDir());
>
> }
> ...
> Again, this is *only* calling context.checkpoint() if isCheckpointed() is
> true.  And we WANT it to be true.
>
> What am I missing here?
>
>
>


Re: Spark Streaming checkpoints and code upgrade

2015-09-09 Thread Nicolas Monchy
Ok. Thank you for the reply.

On Wed, Sep 9, 2015 at 11:40 AM, Tathagata Das  wrote:

> Its pretty much impossible to do across arbitrary code changes. For that,
> the best way is to go forward is the store and load the offsets yourselves.
>
> On Wed, Sep 9, 2015 at 10:19 AM, Nicolas Monchy 
> wrote:
>
>> Hello,
>>
>> I am using Spark Streaming and the Kafka Direct API and I am
>> checkpointing the metadata.
>> Checkpoints aren't recoverable if you upgrade code so I am losing the
>> last consumed offsets in this case.
>>
>> I know I can build a system to store and load the offsets for each batch
>> but before implementing that I would like to know if checkpoints are going
>> to be able to recover a code upgrade in a foreseeable future ?
>>
>> Thanks,
>> Nicolas
>>
>
>


-- 
Nicolas Monchy  |  Software Engineer, Big Data
*GumGum*   |  *Ads that stick*
424 375 8823  |  nico...@gumgum.com


Re: performance when checking if data frame is empty or not

2015-09-09 Thread Ted Yu
Have you tried:

df.rdd.isEmpty

Cheers

On Tue, Sep 8, 2015 at 1:22 PM, Axel Dahl  wrote:

> I have a join, that fails when one of the data frames is empty.
>
> To avoid this I am hoping to check if the dataframe is empty or not before
> the join.
>
> The question is what's the most performant way to do that?
>
> should I do df.count() or df.first() or something else?
>
> Thanks in advance,
>
> -Axel
>


Cores per executors

2015-09-09 Thread Thomas Gerber
Hello,

I was wondering how Spark was enforcing to use *only* X number of cores per
executor.

Is it simply running max Y tasks in parallel on each executor where X = Y
* spark.task.cpus? (This is what I understood from browsing
TaskSchedulerImpl).

Which would mean the processing power used for"map"- (if any) and
"reduce"-side shuffle sorting is unbound (ExternalAppendOnlyMap and
ExternalSorter I guess)?

Thanks,
Thomas


Problems with Local Checkpoints

2015-09-09 Thread Bryan Jeffrey
Hello.

I have some basic code that counts numbers using updateStateByKey.  I setup
a streaming context with checkpointing as follows:

def createStreamingContext(masterName : String, checkpointDirectory :
String, timeWindow : Int) : StreamingContext = {
  val sparkConf = new SparkConf().setAppName("Program")
  val ssc = new StreamingContext(sparkConf, Seconds(timeWindow))
  ssc.checkpoint(checkpointDirectory)
  ssc
}


This runs fine on my distributed (Linux) cluster, writing checkpoints to
local disk. However, when I run on my Windows desktop I am seeing a number
of checkpoint errors:

15/09/09 13:57:06 INFO CheckpointWriter: Saving checkpoint for time
1441821426000 ms to file
'file:/C:/Temp/sparkcheckpoint/checkpoint-1441821426000'
Exception in thread "pool-14-thread-4" java.lang.NullPointerException
 at java.lang.ProcessBuilder.start(ProcessBuilder.java:1012)
 at org.apache.hadoop.util.Shell.runCommand(Shell.java:404)
 at org.apache.hadoop.util.Shell.run(Shell.java:379)
 at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:589)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:678)
 at org.apache.hadoop.util.Shell.execCommand(Shell.java:661)
 at
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:639)
 at
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:468)
 at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:456)
 at
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:905)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:886)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:783)
 at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:772)
 at
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:181)
 at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
 at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
 at java.lang.Thread.run(Thread.java:745)

JAVA_HOME is set correctly, the code runs correctly, it's not a permissions
issue (I've run this as Administrator).  Directories and files are being
created in C:\Temp, although all of the files appear to be empty.

Does anyone have an idea of what is causing these errors?  Has anyone seen
something similar?

Regards,

Bryan Jeffrey


Spark UI keep redirecting to /null and returns 500

2015-09-09 Thread Rajeev Prasad
Hi All,

I am having problem in accessing spark UI while running in spark-client
mode. It works fine in local mode.

It keeps redirecting back to itself by adding /null at the end and
ultimately run out of size limit for url and returns 500. Look at following
below.

I have a feeling that I might be missing some config, I played with various
config setting for yarn with no success.

I am using spark version 1.3.1

Any help will be greatly appreciated.


--2015-09-09 11:22:17--  http://192.168.13.37:4040/

Connecting to 192.168.13.37:4040... connected.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/ [following]

--2015-09-09 11:22:17--  http://192.168.13.37:4040/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/null/null/ [following]

--2015-09-09 11:22:17--  http://192.168.13.37:4040/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location: http://192.168.13.37:4040/null/null/null/null/null/null/null/
[following]

--2015-09-09 11:22:17--
http://192.168.13.37:4040/null/null/null/null/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

HTTP request sent, awaiting response...

  HTTP/1.1 302 Found

  Location:
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/

  Content-Length: 0

  Server: Jetty(8.y.z-SNAPSHOT)

Location:
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/
[following]

--2015-09-09 11:22:17--
http://192.168.13.37:4040/null/null/null/null/null/null/null/null/null/null/null/null/null/null/null/

Reusing existing connection to 192.168.13.37:4040.

Here is stack dump:

15/09/09 11:22:18 WARN server.Response: Committed before 500 null

15/09/09 11:22:18 WARN server.AbstractHttpConnection:

Re: JNI issues with mesos

2015-09-09 Thread Tim Chen
Hi Adrian,

Spark is expecting a specific naming of the tgz and also the folder name
inside, as this is generated by running make-distribution.sh --tgz in the
Spark source folder.

If you use a Spark 1.4 tgz generated with that script with the same name
and upload to HDFS again, fix the URI then it should work.

Tim

On Wed, Sep 9, 2015 at 8:18 AM, Adrian Bridgett 
wrote:

> 5mins later...
>
> Trying 1.5 with a fairly plain build:
> ./make-distribution.sh --tgz --name os1 -Phadoop-2.6
>
> and on my first attempt stderr showed:
> I0909 15:16:49.392144  1619 fetcher.cpp:441] Fetched
> 'hdfs:///apps/spark/spark15.tgz' to
> '/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S1/frameworks/20150826-133446-3217621258-5050-4064-211204/executors/20150826-133446-3217621258-5050-4064-S1/runs/43026ba8-6624-4817-912c-3d7573433102/spark15.tgz'
> sh: 1: cd: can't cd to spark15.tgz
> sh: 1: ./bin/spark-class: not found
>
> Aha, let's rename the file in hdfs (and the two configs) from spark15.tgz
> to spark-1.5.0-bin-os1.tgz...
> Success!!!
>
> The same trick with 1.4 doesn't work, but now that I have something that
> does I can make progress.
>
> Hopefully this helps someone else :-)
>
> Adrian
>
>
> On 09/09/2015 16:59, Adrian Bridgett wrote:
>
> I'm trying to run spark (1.4.1) on top of mesos (0.23).  I've followed the
> instructions (uploaded spark tarball to HDFS, set executor uri in both
> places etc) and yet on the slaves it's failing to lauch even the SparkPi
> example with a JNI error.  It does run with a local master.  A day of
> debugging later and it's time to ask for help!
>
>  bin/spark-submit --master mesos://10.1.201.191:5050 --class
> org.apache.spark.examples.SparkPi /tmp/examples.jar
>
> (I'm putting the jar outside hdfs  - on both client box + slave (turned
> off other slaves for debugging) - due to
> 
> http://apache-spark-user-list.1001560.n3.nabble.com/Remote-jar-file-td20649.html.
> I should note that I had the same JNI errors when using the mesos cluster
> dispatcher).
>
> I'm using Oracle Java 8 (no other java - even openjdk - is installed)
>
> As you can see, the slave is downloading the framework fine (you can even
> see it extracted on the slave).  Can anyone shed some light on what's going
> on - e.g. how is it attempting to run the executor?
>
> I'm going to try a different JVM (and try a custom spark distribution) but
> I suspect that the problem is much more basic. Maybe it can't find the
> hadoop native libs?
>
> Any light would be much appreciated :)  I've included the slaves's stderr
> below:
>
> I0909 14:14:01.405185 32132 logging.cpp:177] Logging to STDERR
> I0909 14:14:01.405256 32132 fetcher.cpp:409] Fetcher Info:
> {"cache_directory":"\/tmp\/mesos\/fetch\/slaves\/20150826-133446-3217621258-5050-4064-S0\/ubuntu","items":[{"action":"BYPASS_CACHE","uri":{"extract":true,"value":"hdfs:\/\/\/apps\/spark\/spark.tgz"}}],"sandbox_directory":"\/tmp\/mesos\/slaves\/20150826-133446-3217621258-5050-4064-S0\/frameworks\/20150826-133446-3217621258-5050-4064-211198\/executors\/20150826-133446-3217621258-5050-4064-S0\/runs\/38077da2-553e-4888-bfa3-ece2ab2119f3","user":"ubuntu"}
> I0909 14:14:01.406332 32132 fetcher.cpp:364] Fetching URI
> 'hdfs:///apps/spark/spark.tgz'
> I0909 14:14:01.406344 32132 fetcher.cpp:238] Fetching directly into the
> sandbox directory
> I0909 14:14:01.406358 32132 fetcher.cpp:176] Fetching URI
> 'hdfs:///apps/spark/spark.tgz'
> I0909 14:14:01.679055 32132 fetcher.cpp:104] Downloading resource with
> Hadoop client from 'hdfs:///apps/spark/spark.tgz' to
> '/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
> I0909 14:14:05.492626 32132 fetcher.cpp:76] Extracting with command: tar
> -C
> '/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3'
> -xf
> '/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
> I0909 14:14:07.489753 32132 fetcher.cpp:84] Extracted
> '/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3/spark.tgz'
> into
> '/tmp/mesos/slaves/20150826-133446-3217621258-5050-4064-S0/frameworks/20150826-133446-3217621258-5050-4064-211198/executors/20150826-133446-3217621258-5050-4064-S0/runs/38077da2-553e-4888-bfa3-ece2ab2119f3'
> W0909 14:14:07.489784 32132 fetcher.cpp:260] Copying instead of extracting
> resource from URI with 'extract' 

Re: bad substitution for [hdp.version] Error in spark on YARN job

2015-09-09 Thread Jeetendra Gangele
Finally it did worked out solved it modifying the mapred-site.xml removed
the entry for application yarn master(from this property removed the HDP
version things).



On 9 September 2015 at 17:44, Jeetendra Gangele 
wrote:

> Hi ,
> I am getting below error when running the spark job on YARN with HDP
> cluster.
> I have installed spark and yarn from Ambari and I am using spark 1.3.1
> with HDP version HDP-2.3.0.0-2557.
>
> My spark-default.conf has correct entry
>
> spark.driver.extraJavaOptions -Dhdp.version=2.3.0.0-2557
> spark.yarn.am.extraJavaOptions -Dhdp.version=2.3.0.0-2557
>
> can anybody from HDP reply on this not sure why hp.version is not getting
> passed thought it setup in con file correctly. I tried passing same to
> spark-submit with --conf "hdp.version=2.3.0.0-2557" same issue no lock.
>
> I am running my job with spark-submit from spark-client machine
>
>
>
> Exit code: 1
> Exception message:
> /hadoop/yarn/local/usercache/hdfs/appcache/application_1441798371988_0002/container_e08_1441798371988_0002_01_05/launch_container.sh:
> line 22:
> $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
> bad substitution
>
> Stack trace: ExitCodeException exitCode=1:
> /hadoop/yarn/local/usercache/hdfs/appcache/application_1441798371988_0002/container_e08_1441798371988_0002_01_05/launch_container.sh:
> line 22:
> $PWD:$PWD/__spark__.jar:$HADOOP_CONF_DIR:/usr/hdp/current/hadoop-client/*:/usr/hdp/current/hadoop-client/lib/*:/usr/hdp/current/hadoop-hdfs-client/*:/usr/hdp/current/hadoop-hdfs-client/lib/*:/usr/hdp/current/hadoop-yarn-client/*:/usr/hdp/current/hadoop-yarn-client/lib/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/*:$PWD/mr-framework/hadoop/share/hadoop/mapreduce/lib/*:$PWD/mr-framework/hadoop/share/hadoop/common/*:$PWD/mr-framework/hadoop/share/hadoop/common/lib/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/*:$PWD/mr-framework/hadoop/share/hadoop/yarn/lib/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/*:$PWD/mr-framework/hadoop/share/hadoop/hdfs/lib/*:$PWD/mr-framework/hadoop/share/hadoop/tools/lib/*:/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar:/etc/hadoop/conf/secure:
> bad substitution
>
> at org.apache.hadoop.util.Shell.runCommand(Shell.java:545)
> at org.apache.hadoop.util.Shell.run(Shell.java:456)
> at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:722)
> at
> org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
> at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
>
> Container exited with a non-zero exit code 1
>
>


Driver OOM after upgrading to 1.5

2015-09-09 Thread Sandy Ryza
I just upgraded the spark-timeseries
 project to run on top of
1.5, and I'm noticing that tests are failing with OOMEs.

I ran a jmap -histo on the process and discovered the top heap items to be:
   1:163428   22236064  
   2:163428   21112648  
   3: 12638   14459192  
   4: 12638   13455904  
   5: 105397642528  

Not sure whether this is suspicious.  Any ideas?

-Sandy


Spark Streaming checkpoints and code upgrade

2015-09-09 Thread Nicolas Monchy
Hello,

I am using Spark Streaming and the Kafka Direct API and I am checkpointing
the metadata.
Checkpoints aren't recoverable if you upgrade code so I am losing the last
consumed offsets in this case.

I know I can build a system to store and load the offsets for each batch
but before implementing that I would like to know if checkpoints are going
to be able to recover a code upgrade in a foreseeable future ?

Thanks,
Nicolas


Spark rdd.mapPartitionsWithIndex() hits physical memory limit after huge data shuffle

2015-09-09 Thread unk1102
Hi I have the following Spark code which involves huge data shuffling even
though using mapPartitionswithIndex() with shuffle false. I have 2 TB of
skewed data to process and then convert rdd into dataframe and use it as
table in hiveContext.sql(). I am using 60 executors with 20 GB memory and 4
cores. I specify spark.yarn.executor.memoryOverhead as 8500 which is high
enough. I am using default settings for spark.shuffle.memoryFraction and
spark.storage.memoryFraction I also tried to change its settings but none
helped. I am using Spark 1.4.0 Please guide I am new to Spark help me
optimize the following code. Thanks in advance.

 JavaRDD indexedRdd = sourceRdd.cache().mapPartitionsWithIndex(new
Function2, Iterator>() {
@Override
public Iterator call(Integer ind, Iterator rowIterator)
throws Exception {
 List rowList = new ArrayList<>();
 while (rowIterator.hasNext()) {
 Row row = rowIterator.next();
 List rowAsList =
iterate(JavaConversions.seqAsJavaList(row.toSeq()));
 Row updatedRow = RowFactory.create(rowAsList.toArray());
 rowList.add(updatedRow);
 }   
   return rowList.iterator();
}
 }, false).union(remainingRdd);
DataFrame baseFrame =
hiveContext.createDataFrame(indexedRdd,MySchema.class);
hiveContext.registerDataFrameasTable(baseFrame,"baseTable");
hiveContext.sql("insert into abc bla bla using baseTable group by bla
bla");
 hiveContext.sql("insert into def bla bla using baseTable group by bla
bla");



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-rdd-mapPartitionsWithIndex-hits-physical-memory-limit-after-huge-data-shuffle-tp24627.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Partitioning a RDD for training multiple classifiers

2015-09-09 Thread Maximo Gurmendez
Adding an example (very raw), to see if my understanding is correct:

 val repartitioned = bidRdd.partitionBy(new Partitioner {
def numPartitions: Int = 100

def getPartition(clientId: Any): Int = hash(clientId) % 100
}

val cachedRdd =  repartitioned.cache()

val client1Rdd = cachedRdd.filter({case (clientId:String,v:String) => 
clientId.equals(“Client 1")})

val client2Rdd = cachedRdd.filter({case (clientId:String,v:String) => 
clientId.equals(“Client 2")})

MyModel.train(client1Rdd)
MyModel.train(client2Rdd)

When the first MyModel.train() runs it will trigger an action and will cause 
the repartition of the original bigRdd and its caching.

Can someone confirm if the following statements are true?

1) When I execute an action on client2Rdd, it will only read from the partition 
that corresponds to Client 2 (it won’t iterate over ALL items originally in 
bigRdd)
2) The caching happens in a way that preserves the partitioning by client Id 
(and the locality)

Thanks,
   Maximo

PD: I am aware that this might cause imbalance of data, but I can probably 
mitigate that with a smarter partitioner.


On Sep 9, 2015, at 9:30 AM, Maximo Gurmendez 
> wrote:

Thanks Ben for your answer. I’ll explore what happens under the hoods in a  
data frame.

Regarding the ability to split a large RDD into n RDDs without requiring n 
passes to the large RDD.  Can partitionBy() help? If I partition by a key that 
corresponds to the the split criteria (i..e client id) and then cache each of 
those RDDs. Will that lessen the effect of repeated large traversals (since 
Spark will figure out that for each smaller RDD it just needs to traverse a 
subset of the partitions)?

Thanks!
   Máximo


On Sep 8, 2015, at 11:32 AM, Ben Tucker 
> wrote:

Hi Maximo —

This is a relatively naive answer, but I would consider structuring the RDD 
into a DataFrame, then saving the 'splits' using something like 
DataFrame.write.parquet(hdfs_path, byPartition=('client')). You could then read 
a DataFrame from each resulting parquet directory and do your per-client work 
from these. You mention re-using the splits, so this solution might be worth 
the file-writing time.

Does anyone know of a method that gets a collection of DataFrames — one for 
each partition, in the byPartition=('client') sense — from a 'big' DataFrame? 
Basically, the equivalent of writing by partition and creating a DataFrame for 
each result, but skipping the HDFS step.


On Tue, Sep 8, 2015 at 10:47 AM, Maximo Gurmendez 
> wrote:
Hi,
I have a RDD that needs to be split (say, by client) in order to train n 
models (i.e. one for each client). Since most of the classifiers that come with 
ml-lib only can accept an RDD as input (and cannot build multiple models in one 
pass - as I understand it), the only way to train n separate models is to 
create n RDDs (by filtering the original one).

Conceptually:

rdd1,rdd2,rdd3 = splitRdds(bigRdd)

the function splitRdd would use the standard filter mechanism .  I would then 
need to submit n training spark jobs. When I do this, will it mean that it will 
traverse the bigRdd n times? Is there a better way to persist the splitted rdd 
(i.e. save the split RDD in a cache)?

I could cache the bigRdd, but not sure that would be ver efficient either since 
it will require the same number of passes anyway (I think - but I’m relatively 
new to Spark). Also I’m planning on reusing the individual splits (rdd1, rdd2, 
etc so would be convenient to have them individually cached).

Another problem is that the splits are could be very skewed (i.e. one split 
could represent a large percentage of the original bigRdd ). So saving the 
split RDDs to disk (at least, naively) could be a challenge.

Is there any better way of doing this?

Thanks!
   Máximo






Re: Spark streaming -> cassandra : Fault Tolerance

2015-09-09 Thread Cody Koeninger
It's been a while since I've looked at the cassandra connector, so I can't
give you specific advice on it.

But in general, if a spark task fails (uncaught exception), it will be
retried automatically.  In the case of the kafka direct stream rdd, it will
have exactly the same messages as the first attempt (as long as they're
still in the kafka log).

If you or the cassandra connector are catching the exception, the task
won't be retried automatically and it's up to you to deal with it.



On Wed, Sep 9, 2015 at 2:09 PM, Samya  wrote:

> Hi Team,
>
> I have an sample spark application which reads from Kafka using direct API
> &
> then does some transformation & stores to cassandra (using
> saveToCassandra()).
>
> If Cassandra goes down, then application logs NoHostAvailable exception (as
> expected). But in the mean time the new incoming messages are lost, as the
> Direct API creates new checkpoint & deletes the previous one's.
>
> Does that mean, I should handle the exception at application side?
>
> Or is there any other hook to handle the same?
>
> Thanks in advance.
>
> Regards,
> Sam
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-streaming-cassandra-Fault-Tolerance-tp24625.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Sandy Ryza
Java 7.

FWIW I was just able to get it to work by increasing MaxPermSize to 256m.

-Sandy

On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin  wrote:

> Java 7 / 8?
>
> On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza 
> wrote:
>
>> I just upgraded the spark-timeseries
>>  project to run on top of
>> 1.5, and I'm noticing that tests are failing with OOMEs.
>>
>> I ran a jmap -histo on the process and discovered the top heap items to
>> be:
>>1:163428   22236064  
>>2:163428   21112648  
>>3: 12638   14459192  
>>4: 12638   13455904  
>>5: 105397642528  
>>
>> Not sure whether this is suspicious.  Any ideas?
>>
>> -Sandy
>>
>
>


Re: [Spark on Amazon EMR] : File does not exist: hdfs://ip-x-x-x-x:/.../spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

2015-09-09 Thread Dean Wampler
If you log into the cluster, do you see the file if you type:

hdfs dfs
-ls 
hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar

(with the correct server address for "ipx-x-x-x"). If not, is the server
address correct and routable inside the cluster. Recall that EC2 instances
have both public and private host names & IP addresses.

Also, is the port number correct for HDFS in the cluster?

dean

Dean Wampler, Ph.D.
Author: Programming Scala, 2nd Edition
 (O'Reilly)
Typesafe 
@deanwampler 
http://polyglotprogramming.com

On Wed, Sep 9, 2015 at 9:28 AM, shahab  wrote:

> Hi,
> I am using Spark on Amazon EMR. So far I have not succeeded to submit the
> application successfully, not sure what's problem. In the log file I see
> the followings.
> java.io.FileNotFoundException: File does not exist:
> hdfs://ipx-x-x-x:8020/user/hadoop/.sparkStaging/application_123344567_0018/spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar
>
> However, even putting spark-assembly-1.4.1-hadoop2.6.0-amzn-0.jar in the
> fat jar file didn't solve the problem. I am out of clue now.
> I want to submit a spark application, using aws web console, as a step. I
> submit the application as : spark-submit --deploy-mode cluster --class
> mypack.MyMainClass --master yarn-cluster s3://mybucket/MySparkApp.jar Is
> there any one who has similar problem with EMR?
>
> best,
> /Shahab
>


Re: Driver OOM after upgrading to 1.5

2015-09-09 Thread Reynold Xin
It's likely that with codegen, you need a bigger permgen space. Also I
found that Java 7 doesn't do very well w.r.t. flushing code cache. As a
result, Spark SQL and DataFrames now run much better under Java 8, because
it flushes code cache better.


On Wed, Sep 9, 2015 at 2:12 PM, Sandy Ryza  wrote:

> Java 7.
>
> FWIW I was just able to get it to work by increasing MaxPermSize to 256m.
>
> -Sandy
>
> On Wed, Sep 9, 2015 at 11:37 AM, Reynold Xin  wrote:
>
>> Java 7 / 8?
>>
>> On Wed, Sep 9, 2015 at 10:10 AM, Sandy Ryza 
>> wrote:
>>
>>> I just upgraded the spark-timeseries
>>>  project to run on top of
>>> 1.5, and I'm noticing that tests are failing with OOMEs.
>>>
>>> I ran a jmap -histo on the process and discovered the top heap items to
>>> be:
>>>1:163428   22236064  
>>>2:163428   21112648  
>>>3: 12638   14459192  
>>>4: 12638   13455904  
>>>5: 105397642528  
>>>
>>> Not sure whether this is suspicious.  Any ideas?
>>>
>>> -Sandy
>>>
>>
>>
>


Re: What should be the optimal value for spark.sql.shuffle.partition?

2015-09-09 Thread Richard Marscher
Do you have any details about the cluster you are running this against? The
memory per executor/node, number of executors, and such? Even at a shuffle
setting of 1000 that would be roughly 1GB per partition assuming the 1TB of
data includes overheads in the JVM. Maybe try another order of magnitude
higher for number of shuffle partitions and see where that gets you?

On Tue, Sep 1, 2015 at 12:11 PM, unk1102  wrote:

> Hi I am using Spark SQL actually hiveContext.sql() which uses group by
> queries and I am running into OOM issues. So thinking of increasing value
> of
> spark.sql.shuffle.partition from 200 default to 1000 but it is not helping.
> Please correct me if I am wrong this partitions will share data shuffle
> load
> so more the partitions less data to hold. Please guide I am new to Spark. I
> am using Spark 1.4.0 and I have around 1TB of uncompressed data to process
> using hiveContext.sql() group by queries.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/What-should-be-the-optimal-value-for-spark-sql-shuffle-partition-tp24543.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: spark.shuffle.spill=false ignored?

2015-09-09 Thread Richard Marscher
Hi Eric,

I just wanted to do a sanity check, do you know what paths it is trying to
write to? I ask because even without spilling, shuffles always write to
disk first before transferring data across the network. I had at one point
encountered this myself where we accidentally had /tmp mounted on a tiny
disk and kept running out of disk on shuffles even though we also don't
spill. You may have already considered or ruled this out though.

On Thu, Sep 3, 2015 at 12:56 PM, Eric Walker  wrote:

> Hi,
>
> I am using Spark 1.3.1 on EMR with lots of memory.  I have attempted to
> run a large pyspark job several times, specifying
> `spark.shuffle.spill=false` in different ways.  It seems that the setting
> is ignored, at least partially, and some of the tasks start spilling large
> amounts of data to disk.  The job has been fast enough in the past, but
> once it starts spilling to disk it lands on Miller's planet [1].
>
> Is this expected behavior?  Is it a misconfiguration on my part, e.g.,
> could there be an incompatible setting that is overriding
> `spark.shuffle.spill=false`?  Is it something that goes back to Spark
> 1.3.1?  Is it something that goes back to EMR?  When I've allowed the job
> to continue on for a while, I've started to see Kryo stack traces in the
> tasks that are spilling to disk.  The stack traces mention there not being
> enough disk space, although a `df` shows plenty of space (perhaps after the
> fact, when temporary files have been cleaned up).
>
> Has anyone run into something like this before?  I would be happy to see
> OOM errors, because that would be consistent with one understanding of what
> might be going on, but I haven't yet.
>
> Eric
>
>
> [1] https://www.youtube.com/watch?v=v7OVqXm7_Pk=active
>



-- 
*Richard Marscher*
Software Engineer
Localytics
Localytics.com  | Our Blog
 | Twitter  |
Facebook  | LinkedIn



Re: Adding/subtracting org.apache.spark.mllib.linalg.Vector in Scala?

2015-09-09 Thread Burak Yavuz
By the way, I published http://spark-packages.org/package/brkyvz/lazy-linalg
that contains many of the arithmetic operations for use in Scala. I really
would appreciate any feedback!

On Tue, Aug 25, 2015 at 11:06 AM, Kristina Rogale Plazonic  wrote:

> YES PLEASE!
>
> :)))
>
> On Tue, Aug 25, 2015 at 1:57 PM, Burak Yavuz  wrote:
>
>> Hmm. I have a lot of code on the local linear algebra operations using
>> Spark's Matrix and Vector representations
>> done for https://issues.apache.org/jira/browse/SPARK-6442.
>>
>> I can make a Spark package with that code if people are interested.
>>
>> Best,
>> Burak
>>
>> On Tue, Aug 25, 2015 at 10:54 AM, Kristina Rogale Plazonic <
>> kpl...@gmail.com> wrote:
>>
>>> However I do think it's easier than it seems to write the implicits;
 it doesn't involve new classes or anything. Yes it's pretty much just
 what you wrote. There is a class "Vector" in Spark. This declaration
 can be in an object; you don't implement your own class. (Also you can
 use "toBreeze" to get Breeze vectors.)
>>>
>>>
>>> The implicit conversion with the implicit def happens for the first
>>> vector in the sum, but not the second vector (see below).
>>>
>>> At this point I give up, because I spent way too much time.  I am so
>>> disappointed.  So many times I heard "Spark makes simple things easy and
>>> complicated things possible". Well, here is the simplest thing you can
>>> imagine in linear algebra, but heck, it is not easy or intuitive.  It was
>>> easier to run a DeepLearning algo (from another library) than add two
>>> vectors.
>>>
>>> If anybody has a workaround other than implementing your own
>>> add/substract/scalarMultiply, PLEASE let me know.
>>>
>>> Here is the code and error from (freshly started) spark-shell:
>>>
>>> scala> import breeze.linalg.{DenseVector => BDV, SparseVector => BSV,
>>> Vector => BV}
>>> import breeze.linalg.{DenseVector=>BDV, SparseVector=>BSV, Vector=>BV}
>>>
>>> scala> import org.apache.spark.mllib.linalg.Vectors
>>> import org.apache.spark.mllib.linalg.Vectors
>>>
>>> scala> val v1 = Vectors.dense(1.0, 2.0, 3.0)
>>> v1: org.apache.spark.mllib.linalg.Vector = [1.0,2.0,3.0]
>>>
>>> scala> import org.apache.spark.mllib.linalg.{Vector =>SparkVector}
>>> import org.apache.spark.mllib.linalg.{Vector=>SparkVector}
>>>
>>> scala> object MyUtils {
>>>  |   implicit def toBreeze(v:SparkVector) = BV(v.toArray)
>>>  | }
>>> warning: there were 1 feature warning(s); re-run with -feature for
>>> details
>>> defined module MyUtils
>>>
>>> scala> import MyUtils._
>>> import MyUtils._
>>>
>>> scala> v1:BV[Double]
>>> res2: breeze.linalg.Vector[Double] = DenseVector(1.0, 2.0, 3.0)
>>>
>>> scala> v1 + v1
>>> :30: error: could not find implicit value for parameter op:
>>> breeze.linalg.operators.OpAdd.Impl2[breeze.linalg.Vector[Double],org.apache.spark.mllib.linalg.Vector,That]
>>>   v1 + v1
>>>  ^
>>>
>>>
>>>
>>
>>
>


  1   2   >