Re: Worker is KILLED for no reason

2015-06-23 Thread Demi Ben-Ari
Hi,
I've open up an issue bug on the Spark project on JIRA:
https://issues.apache.org/jira/browse/SPARK-8557

Would really appreciate some insights on the issue,
*It's strange that no one else encountered the problem.*

Have a great day!

On Mon, Jun 15, 2015 at 12:03 PM, nizang  wrote:

> hi,
>
> I'm using the new 1.4.0 installation, and ran a job there. The job finished
> and everything seems fine. When I enter the application, I can see that the
> job is marked as KILLED:
>
> Removed Executors
>
> ExecutorID  Worker  Cores   Memory  State   Logs
> 0   worker-20150615080550-172.31.11.225-51630   4   10240
>  KILLED  stdout stderr
>
> when I enter the worker itself, I can see it marked as EXITED:
>
>
> ExecutorID  Cores   State   Memory  Job Details Logs
> 0   4   EXITED  10.0 GB
> ID: app-20150615080601-
> Name: dev.app.name
> User: root
> stdout stderr
>
> no interesting things in the stdout or stderr
>
> Why is the job marked as KILLED in the application page?
>
> this is the only job I ran, and the job that was in this executors. Also,
> by
> checking the logs and output things seems to run fine
>
> thanks, nizan
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Worker-is-KILLED-for-no-reason-tp23314.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
Best regards,
Demi Ben-Ari 
Senior Software Engineer
Windward Ltd. 


Re: When to use underlying data management layer versus standalone Spark?

2015-06-23 Thread Sonal Goyal
When you deploy spark over hadoop, you typically want to leverage the
replication of hdfs or your data is already in hadoop. Again, if your data
is already in Cassandra or if you want to do updateable atomic row
operations and access to your data as well as run analytic jobs, that may
be another case.
On Jun 24, 2015 1:17 AM, "commtech"  wrote:

> Hi,
>
> I work at a large financial institution in New York. We're looking into
> Spark and trying to learn more about the deployment/use cases for real-time
> analytics with Spark. When would it be better to deploy standalone Spark
> versus Spark on top of a more comprehensive data management layer (Hadoop,
> Cassandra, MongoDB, etc.)? If you do deploy on top of one of these, are
> there different use cases where one of these database management layers are
> better or worse?
>
> Any color would be very helpful. Thank you in advance.
>
> Sincerely,
> Michael
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/When-to-use-underlying-data-management-layer-versus-standalone-Spark-tp23455.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Should I keep memory dedicated for HDFS and Spark on cluster nodes?

2015-06-23 Thread Akhil Das
Depending the size of the memory you are having, you ccould allocate 60-80%
of the memory for the spark worker process. Datanode doesn't require too
much memory.
On 23 Jun 2015 21:26, "maxdml"  wrote:

> I'm wondering if there is a real benefit for splitting my memory in two for
> the datanode/workers.
>
> Datanodes and OS needs memory to perform their business. I suppose there
> could be loss of performance if they came to compete for memory with the
> worker(s).
>
> Any opinion? :-)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Should-I-keep-memory-dedicated-for-HDFS-and-Spark-on-cluster-nodes-tp23451.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark and HDFS ( Worker and Data Nodes Combination )

2015-06-23 Thread Akhil Das
I think this is how it works, So RDDs will have partitions which are made
up of blocks and the blockManager will know where these blocks are
available, based on the availability (PROCESS_LOCAL, NODE_LOCAL etc), spark
will launch the tasks on those nodes. This behaviour can be controlled with
spark.locality.wait
On 22 Jun 2015 19:21, "ayan guha"  wrote:

I have a basic qs: how spark assigns partition to an executor? Does it
respect data locality? Does this behaviour depend on cluster manager, ie
yarn vs standalone?
On 22 Jun 2015 22:45, "Akhil Das"  wrote:

> Option 1 should be fine, Option 2 would bound a lot on network as the data
> increase in time.
>
> Thanks
> Best Regards
>
> On Mon, Jun 22, 2015 at 5:59 PM, Ashish Soni 
> wrote:
>
>> Hi All  ,
>>
>> What is the Best Way to install and Spark Cluster along side with Hadoop
>> Cluster , Any recommendation for below deployment topology will be a great
>> help
>>
>> *Also Is it necessary to put the Spark Worker on DataNodes as when it
>> read block from HDFS it will be local to the Server / Worker or  I can put
>> the Worker on any other nodes and if i do that will it affect the
>> performance of the Spark Data Processing ..*
>>
>> Hadoop Option 1
>>
>> Server 1 - NameNode   & Spark Master
>> Server 2 - DataNode 1  & Spark Worker
>> Server 3 - DataNode 2  & Spark Worker
>> Server 4 - DataNode 3  & Spark Worker
>>
>> Hadoop Option 2
>>
>>
>> Server 1 - NameNode
>> Server 2 - Spark Master
>> Server 2 - DataNode 1
>> Server 3 - DataNode 2
>> Server 4 - DataNode 3
>> Server 5 - Spark Worker 1
>> Server 6 - Spark Worker 2
>> Server 7 - Spark Worker 3
>>
>> Thanks.
>>
>>
>>
>>
>


Re: when cached RDD will unpersist its data

2015-06-23 Thread eric wong
In a case that memory cannot hold all the cached RDD, then BlockManager
will evict some older block for storage of new RDD block.


Hope that will helpful.

2015-06-24 13:22 GMT+08:00 bit1...@163.com :

> I am kind of consused about when cached RDD will unpersist its data. I
> know we can explicitly unpersist it with RDD.unpersist ,but can it be
> unpersist automatically by the spark framework?
> Thanks.
>
> --
> bit1...@163.com
>



-- 
王海华


when cached RDD will unpersist its data

2015-06-23 Thread bit1...@163.com
I am kind of consused about when cached RDD will unpersist its data. I know we 
can explicitly unpersist it with RDD.unpersist ,but can it be unpersist 
automatically by the spark framework?
Thanks.



bit1...@163.com


Re: Yarn application ID for Spark job on Yarn

2015-06-23 Thread canan chen
I don't think there is yarn related stuff to access in spark.  Spark don't
depend on yarn.

BTW, why do you want the yarn application id ?

On Mon, Jun 22, 2015 at 11:45 PM, roy  wrote:

> Hi,
>
>   Is there a way to get Yarn application ID inside spark application, when
> running spark Job on YARN ?
>
> Thanks
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Yarn-application-ID-for-Spark-job-on-Yarn-tp23429.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark launching without all of the requested YARN resources

2015-06-23 Thread canan chen
Why do you want it start until all the resources are ready ? Make it start
as early as possible should make it complete earlier and increase the
utilization of resources

On Tue, Jun 23, 2015 at 10:34 PM, Arun Luthra  wrote:

> Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark
> (via spark-submit) will begin its processing even though it apparently did
> not get all of the requested resources; it is running very slowly.
>
> Is there a way to force Spark/YARN to only begin when it has the full set
> of resources that I request?
>
> Thanks,
> Arun
>


Re: When to use underlying data management layer versus standalone Spark?

2015-06-23 Thread canan chen
I don't think this is the correct question.  Spark can be deployed on
different cluster manager frameworks like standard alone, yarn & mesos.
Spark can't run without these cluster manager framework, that means spark
depend on cluster manager framework.

And the data management layer is the upstream of spark which is independent
with spark. But spark do provide apis to access different data management
layer.
It should depend on your upstream application which data store should use,
it's not related with spark.


On Wed, Jun 24, 2015 at 3:46 AM, commtech  wrote:

> Hi,
>
> I work at a large financial institution in New York. We're looking into
> Spark and trying to learn more about the deployment/use cases for real-time
> analytics with Spark. When would it be better to deploy standalone Spark
> versus Spark on top of a more comprehensive data management layer (Hadoop,
> Cassandra, MongoDB, etc.)? If you do deploy on top of one of these, are
> there different use cases where one of these database management layers are
> better or worse?
>
> Any color would be very helpful. Thank you in advance.
>
> Sincerely,
> Michael
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/When-to-use-underlying-data-management-layer-versus-standalone-Spark-tp23455.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: map V mapPartitions

2015-06-23 Thread canan chen
One example is that you'd like to set up jdbc connection for each partition
and share this connection across the records.

mapPartitions is much more like the paradigm of mapper in mapreduce. In the
mapper of mapreduce, you have setup method  to do any initialization stuff
before processing the split and read and process records one by one  in the
map method.

On Wed, Jun 24, 2015 at 8:03 AM, Holden Karau  wrote:

> I think one of the primary cases where mapPartitions is useful if you are
> going to be doing any setup work that can be re-used between processing
> each element, this way the setup work only needs to be done once per
> partition (for example creating an instance of jodatime).
>
> Both map and mapPartitions are implemented using the MapPartitionsRDD.
>
> In general if your logic is easily expressed with map, and there isn't any
> setup work you are doing that could be shared, using map instead of map
> partitions tends to result in more readable code which is valuable in and
> off its self.
>
> On Tue, Jun 23, 2015 at 4:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
> wrote:
>
>> I know when to use a map () but when should i use mapPartitions() ?
>>
>> Which is faster ?
>>
>> --
>> Deepak
>>
>>
>
>
> --
> Cell : 425-233-8271
> Twitter: https://twitter.com/holdenkarau
> Linked In: https://www.linkedin.com/in/holdenkarau
>


Re: flume sinks supported by spark streaming

2015-06-23 Thread Ruslan Dautkhanov
https://spark.apache.org/docs/latest/streaming-flume-integration.html

Yep, avro sink is the correct one.



-- 
Ruslan Dautkhanov

On Tue, Jun 23, 2015 at 9:46 PM, Hafiz Mujadid 
wrote:

> Hi!
>
>
> I want to integrate flume with spark streaming. I want to know which sink
> type of flume are supported by spark streaming? I saw one example using
> avroSink.
>
> Thanks
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/flume-sinks-supported-by-spark-streaming-tp23462.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark standalone cluster - resource management

2015-06-23 Thread canan chen
Check the available resources you have (cpu cores & memory ) on master web
ui.
The log you see means the job can't get any resources.



On Wed, Jun 24, 2015 at 5:03 AM, Nizan Grauer  wrote:

> I'm having 30G per machine
>
> This is the first (and only) job I'm trying to submit. So it's weird that
> for --total-executor-cores=20 it works, and for --total-executor-cores=4
> it doesn't
>
> On Tue, Jun 23, 2015 at 10:46 PM, Igor Berman 
> wrote:
>
>> probably there are already running jobs there
>> in addition, memory is also a resource, so if you are running 1
>> application that took all your memory and then you are trying to run
>> another application that asks
>> for the memory the cluster doesn't have then the second app wont be
>> running
>>
>> so why are u specifying 22g as executor memory? how much memory you have
>> for each machine?
>>
>> On 23 June 2015 at 09:33, nizang  wrote:
>>
>>> to give a bit more data on what I'm trying to get -
>>>
>>> I have many tasks I want to run in parallel, so I want each task to catch
>>> small part of the cluster (-> only limited part of my 20 cores in the
>>> cluster)
>>>
>>> I have important tasks that I want them to get 10 cores, and I have small
>>> tasks that I want to run with only 1 or 2 cores)
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>


flume sinks supported by spark streaming

2015-06-23 Thread Hafiz Mujadid
Hi!


I want to integrate flume with spark streaming. I want to know which sink
type of flume are supported by spark streaming? I saw one example using
avroSink.

Thanks





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/flume-sinks-supported-by-spark-streaming-tp23462.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka createDirectStream ​issue

2015-06-23 Thread Tathagata Das
Please run it in your own application and not in the spark shell. I see
that you are trying to stop the Spark context and create a new
StreamingContext. That will lead to unexpected issue, that you are seeing.
Please make a standalone SBT/Maven app for Spark Streaming.

On Tue, Jun 23, 2015 at 3:43 PM, syepes  wrote:

> yes, I have two clusters one standalone an another using Mesos
>
>  Sebastian YEPES
>http://sebastian-yepes.com
>
> On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] <[hidden
> email] > wrote:
>
>> Hi syepes,
>> Are u run the application in standalone mode?
>> Regards
>> El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <[hidden
>> email] > escribió:
>>
>>> Hello,
>>>
>>> I ​am trying ​use the new Kafka ​consumer
>>> ​​"KafkaUtils.createDirectStream"​ but I am having some issues making it
>>> work.
>>> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363
>>> and I am still getting the same strange exception "ClassNotFoundException:
>>> $line49.$read$$iwC$$i"
>>>
>>> Has anyone else been facing this kind of problem?
>>>
>>> The following is the code and logs that I have been using to reproduce
>>> the issue:
>>>
>>> spark-shell: script
>>> --
>>> sc.stop()
>>> import _root_.kafka.serializer.StringDecoder
>>> import org.apache.spark.SparkConf
>>> import org.apache.spark.streaming._
>>> import org.apache.spark.streaming.kafka.KafkaUtils
>>>
>>> val sparkConf = new
>>> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
>>> "4041" ).set("spark.driver.allowMultipleContexts",
>>> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
>>>
>>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>>>
>>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>>> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
>>> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
>>> val topic = Set("test")
>>> val messages = KafkaUtils.createDirectStream[String, String,
>>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>>
>>> val raw = messages.map(_._2)
>>> val words = raw.flatMap(_.split(" "))
>>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>>> wordCounts.print()
>>>
>>> ssc.start()
>>> ssc.awaitTermination()
>>> --
>>>
>>>
>>> spark-shell: output
>>> --
>>> sparkConf: org.apache.spark.SparkConf =
>>> org.apache.spark.SparkConf@330e37b2
>>> ssc: org.apache.spark.streaming.StreamingContext =
>>> org.apache.spark.streaming.StreamingContext@28ec9c23
>>> kafkaParams: scala.collection.immutable.Map[String,String] =
>>> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
>>> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
>>> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
>>> WARN  [main] kafka.utils.VerifiableProperties - Property
>>> schema.registry.url is not valid
>>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
>>> raw: org.apache.spark.streaming.dstream.DStream[String] =
>>> org.apache.spark.streaming.dstream.MappedDStream@578ce232
>>> words: org.apache.spark.streaming.dstream.DStream[String] =
>>> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
>>> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
>>> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
>>> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
>>> schema.registry.url is not valid
>>> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
>>> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
>>> java.lang.ClassNotFoundException:
>>> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>>> at java.lang.Class.forName0(Native Method)
>>> at java.lang.Class.forName(Class.java:348)
>>> at
>>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>>>
>>> at
>>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>>> at
>>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>>> at
>>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>>> at
>>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>>> at
>>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>>> 

Re: mutable vs. pure functional implementation - StatCounter

2015-06-23 Thread Xiangrui Meng
Creating millions of temporary (immutable) objects is bad for
performance. It should be simple to do a micro-benchmark locally.
-Xiangrui

On Mon, Jun 22, 2015 at 7:25 PM, mzeltser  wrote:
> Using StatCounter as an example, I'd like to understand if "pure" functional
> implementation would be more or less beneficial for "accumulating"
> structures used inside RDD.map
>
> StatCounter.merge is updating mutable class variables and returning
> reference to same object. This is clearly a non-functional implementation
> and it mutates existing state of the instance. (Unless I'm missing
> something)
>
> Would it be preferable to have all the class variables declared as val and
> create new instance to hold merged values?
>
> The StatCounter would be used inside the RDD.map to collect stats on the
> fly.
> Would mutable state present bottleneck?
>
> Can anybody comment on why non-functional implementation has been chosen?
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/mutable-vs-pure-functional-implementation-StatCounter-tp23441.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: which mllib algorithm for large multi-class classification?

2015-06-23 Thread Xiangrui Meng
We have multinomial logistic regression implemented. For your case,
the model size is 500 * 300,000 = 150,000,000. MLlib's implementation
might not be able to handle it efficiently, we plan to have a more
scalable implementation in 1.5. However, it shouldn't give you an
"array larger than MaxInt" exception. Could you paste the stack trace?
-Xiangrui

On Mon, Jun 22, 2015 at 4:21 PM, Danny  wrote:
> hi,
>
> I am unfortunately not very fit in the whole MLlib stuff, so I would
> appreciate a little help:
>
> Which multi-class classification algorithm i should use if i want to train
> texts (100-1000 words each) into categories. The number of categories is
> between 100-500 and the number of training documents which i have transform
> to tf-idf vectors is max ~ 300.000
>
> it looks like the most algorithms are running into OOM exception or "array
> larger than MaxInt" exceptions with a large number of classes/categories
> cause there are "collect" steps in it?
>
> thanks a lot
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/which-mllib-algorithm-for-large-multi-class-classification-tp23439.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How could output the StreamingLinearRegressionWithSGD prediction result?

2015-06-23 Thread Xiangrui Meng
Please check the input path to your test data, and call `.count()` and
see whether there are records in it. -Xiangrui

On Sat, Jun 20, 2015 at 9:23 PM, Gavin Yue  wrote:
> Hey,
>
> I am testing the StreamingLinearRegressionWithSGD following the tutorial.
>
>
> It works, but I could not output the prediction results. I tried the
> saveAsTextFile, but it only output _SUCCESS to the folder.
>
>
> I am trying to check the prediction results and use
> BinaryClassificationMetrics to get areaUnderROC.
>
>
> Any example for this?
>
> Thank you !

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Failed stages and dropped executors when running implicit matrix factorization/ALS

2015-06-23 Thread Xiangrui Meng
It shouldn't be hard to handle 1 billion ratings in 1.3. Just need
more information to guess what happened:

1. Could you share the ALS settings, e.g., number of blocks, rank and
number of iterations, as well as number of users/items in your
dataset?
2. If you monitor the progress in the WebUI, how much data is stored
in memory and how much data is shuffled per iteration?
3. Do you have enough disk space for the shuffle files?
4. Did you set checkpointDir in SparkContext and checkpointInterval in ALS?

Best,
Xiangrui

On Fri, Jun 19, 2015 at 11:43 AM, Ravi Mody  wrote:
> Hi, I'm running implicit matrix factorization/ALS in Spark 1.3.1 on fairly
> large datasets (1+ billion input records). As I grow my dataset I often run
> into issues with a lot of failed stages and dropped executors, ultimately
> leading to the whole application failing. The errors are like
> "org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output
> location for shuffle 19" and "org.apache.spark.shuffle.FetchFailedException:
> Failed to connect to...". These occur during flatMap, mapPartitions, and
> aggregate stages. I know that increasing memory fixes this issue, but most
> of the time my executors are only using a tiny portion of the their
> allocated memory (<10%). Often, the stages run fine until the last iteration
> or two of ALS, but this could just be a coincidence.
>
> I've tried tweaking a lot of settings, but it's time-consuming to do this
> through guess-and-check. Right now I have these set:
> spark.shuffle.memoryFraction = 0.3
> spark.storage.memoryFraction = 0.65
> spark.executor.heartbeatInterval = 60
>
> I'm sure these settings aren't optimal - any idea of what could be causing
> my errors, and what direction I can push these settings in to get more out
> of my memory? I'm currently using 240 GB of memory (on 7 executors) for a 1
> billion record dataset, which seems like too much. Thanks!

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Issue running Spark 1.4 on Yarn

2015-06-23 Thread Matt Kapilevich
Hi Kevin I never did. I checked for free space in the root partition, don't
think this was an issue. Now that 1.4 is officially out I'll probably give
it another shot.
On Jun 22, 2015 4:28 PM, "Kevin Markey"  wrote:

>  Matt:  Did you ever resolve this issue?  When running on a cluster or
> pseudocluster with too little space for /tmp or /var files, we've seen this
> sort of behavior.  There's enough memory, and enough HDFS space, but
> there's insufficient space on one or more nodes for other temporary files
> as logs grow and don't get cleared or deleted.  Depends on your
> configuration.  Often restarting will temporarily fix things, but for
> shorter and shorter periods of time until nothing works.
>
> Fix is to expand space available for logs, pruning them, a cron job to
> prune them periodically, and/or modifying limits on logs.
>
> Kevin
>
> On 06/09/2015 04:15 PM, Matt Kapilevich wrote:
>
> I've tried running a Hadoop app pointing to the same queue. Same thing
> now, the job doesn't get accepted. I've cleared out the queue and killed
> all the pending jobs, the queue is still unusable.
>
>  It seems like an issue with YARN, but it's specifically Spark that
> leaves the queue in this state. I've ran a Hadoop job in a for loop 10x,
> while specifying the queue explicitly, just to double-check.
>
> On Tue, Jun 9, 2015 at 4:45 PM, Matt Kapilevich 
> wrote:
>
>> From the RM scheduler, I see 3 applications currently stuck in the
>> "root.thequeue" queue.
>>
>>  Used Resources: 
>> Num Active Applications: 0
>> Num Pending Applications: 3
>> Min Resources: 
>> Max Resources: 
>> Steady Fair Share: 
>> Instantaneous Fair Share: 
>>
>> On Tue, Jun 9, 2015 at 4:30 PM, Matt Kapilevich 
>> wrote:
>>
>>> Yes! If I either specify a different queue or don't specify a queue at
>>> all, it works.
>>>
>>> On Tue, Jun 9, 2015 at 4:25 PM, Marcelo Vanzin 
>>> wrote:
>>>
 Does it work if you don't specify a queue?

 On Tue, Jun 9, 2015 at 1:21 PM, Matt Kapilevich 
 wrote:

> Hi Marcelo,
>
>  Yes, restarting YARN fixes this behavior and it again works the
> first few times. The only thing that's consistent is that once Spark job
> submissions stop working, it's broken for good.
>
> On Tue, Jun 9, 2015 at 4:12 PM, Marcelo Vanzin 
> wrote:
>
>>  Apologies, I see you already posted everything from the RM logs
>> that mention your stuck app.
>>
>>  Have you tried restarting the YARN cluster to see if that changes
>> anything? Does it go back to the "first few tries work" behaviour?
>>
>>  I run 1.4 on top of CDH 5.4 pretty often and haven't seen anything
>> like this.
>>
>>
>> On Tue, Jun 9, 2015 at 1:01 PM, Marcelo Vanzin 
>> wrote:
>>
>>>  On Tue, Jun 9, 2015 at 11:31 AM, Matt Kapilevich <
>>> matve...@gmail.com> wrote:
>>>
  Like I mentioned earlier, I'm able to execute Hadoop jobs fine
 even now - this problem is specific to Spark.

>>>
>>>  That doesn't necessarily mean anything. Spark apps have different
>>> resource requirements than Hadoop apps.
>>>
>>> Check your RM logs for any line that mentions your Spark app id.
>>> That may give you some insight into what's happening or not.
>>>
>>> --
>>> Marcelo
>>>
>>
>>
>>
>>  --
>> Marcelo
>>
>
>


  --
 Marcelo

>>>
>>>
>>
>
>


Re: Spark FP-Growth algorithm for frequent sequential patterns

2015-06-23 Thread Xiangrui Meng
This is on the wish list for Spark 1.5. Assuming that the items from
the same transaction are distinct. We can still follow FP-Growth's
steps:

1. find frequent items
2. filter transactions and keep only frequent items
3. do NOT order by frequency
4. use suffix to partition the transactions (whether to use prefix or
suffix doesn't really matter in this case)
5. grow FP-tree locally on each partition (the data structure should
be the same)
6. generate frequent sub-sequences

+Feynman

Best,
Xiangrui

On Fri, Jun 19, 2015 at 10:51 AM, ping yan  wrote:
> Hi,
>
> I have a use case where I'd like to mine frequent sequential patterns
> (consider the clickpath scenario). Transaction A -> B doesn't equal
> Transaction B->A..
>
> From what I understand about FP-growth in general and the MLlib
> implementation of it, the orders are not preserved. Anyone can provide some
> insights or ideas in extending the algorithm to solve frequent sequential
> pattern mining problems?
>
> Thanks as always.
>
>
> Ping
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Settings for K-Means Clustering in Mlib for large data set

2015-06-23 Thread Xiangrui Meng
A rough estimate of the worst case memory requirement for driver is
about 2 * k * runs * numFeatures * numPartitions * 8 bytes. I put 2 at
the beginning because the previous centers are still in memory while
receiving new center updates. -Xiangrui

On Fri, Jun 19, 2015 at 9:02 AM, Rogers Jeffrey
 wrote:
> Thanks. Setting the driver memory property  worked for  K=1000 . But when I
> increased K to1500 I get the following error:
>
> 15/06/19 09:38:44 INFO ContextCleaner: Cleaned accumulator 7
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.3.51:45157 in memory (size: 1568.0 B, free: 10.4 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.9.50:59356 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.9.50:60934 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.15.51:37825 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO BlockManagerInfo: Removed broadcast_34_piece0 on
> 172.31.15.51:60610 in memory (size: 1568.0 B, free: 73.6 GB)
>
> 15/06/19 09:38:44 INFO ContextCleaner: Cleaned shuffle 5
>
> Exception in thread "Thread-2" java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
>
> at java.util.Arrays.copyOf(Arrays.java:2367)
>
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587)
>
> at java.lang.StringBuilder.append(StringBuilder.java:214)
>
> at py4j.Protocol.getOutputCommand(Protocol.java:305)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:82)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>
> at java.lang.Thread.run(Thread.java:745)
>
> Exception in thread "Thread-300" java.lang.OutOfMemoryError: Requested array
> size exceeds VM limit
>
> at java.util.Arrays.copyOf(Arrays.java:2367)
>
> at
> java.lang.AbstractStringBuilder.expandCapacity(AbstractStringBuilder.java:130)
>
> at
> java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:114)
>
> at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:587)
>
> at java.lang.StringBuilder.append(StringBuilder.java:214)
>
> at py4j.Protocol.getOutputCommand(Protocol.java:305)
>
> at py4j.commands.CallCommand.execute(CallCommand.java:82)
>
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
>
>
> Is there any method/guideline through which I can understand the memory
> requirement before hand and make appropriate configurations?
>
> Regards,
> Rogers Jeffrey L
>
> On Thu, Jun 18, 2015 at 8:14 PM, Rogers Jeffrey 
> wrote:
>>
>> I am submitting the application from a python notebook. I am launching
>> pyspark as follows:
>>
>> SPARK_PUBLIC_DNS=ec2-54-165-202-17.compute-1.amazonaws.com
>> SPARK_WORKER_CORES=8 SPARK_WORKER_MEMORY=15g SPARK_MEM=30g OUR_JAVA_MEM=30g
>> SPARK_DAEMON_JAVA_OPTS="-XX:MaxPermSize=30g -Xms30g -Xmx30g" IPYTHON=1
>> PYSPARK_PYTHON=/usr/bin/python SPARK_PRINT_LAUNCH_COMMAND=1
>> ./spark/bin/pyspark --master
>> spark://54.165.202.17.compute-1.amazonaws.com:7077   --deploy-mode client
>>
>> I guess I should be adding another extra argument --conf
>> "spark.driver.memory=15g" . Is that correct?
>>
>> Regards,
>> Rogers Jeffrey L
>>
>> On Thu, Jun 18, 2015 at 7:50 PM, Xiangrui Meng  wrote:
>>>
>>> With 80,000 features and 1000 clusters, you need 80,000,000 doubles to
>>> store the cluster centers. That is ~600MB. If there are 10 partitions,
>>> you might need 6GB on the driver to collect updates from workers. I
>>> guess the driver died. Did you specify driver memory with
>>> spark-submit? -Xiangrui
>>>
>>> On Thu, Jun 18, 2015 at 12:22 PM, Rogers Jeffrey
>>>  wrote:
>>> > Hi All,
>>> >
>>> > I am trying to run KMeans clustering on a large data set with 12,000
>>> > points
>>> > and 80,000 dimensions.  I have a spark cluster in Ec2 stand alone mode
>>> > with
>>> > 8  workers running on 2 slaves with 160 GB Ram and 40 VCPU.
>>> >
>>> > My Code is as Follows:
>>> >
>>> > def convert_into_sparse_vector(A):
>>> > non_nan_indices=np.nonzero(~np.isnan(A) )
>>> > non_nan_values=A[non_nan_indices]
>>> > dictionary=dict(zip(non_nan_indices[0],non_nan_values))
>>> > return Vectors.sparse (len(A),dictionary)
>>> >
>>> > X=[convert_into_sparse_vector(A) for A in complete_dataframe.values ]
>>> > sc=SparkContext(appName="parallel_kmeans")
>>> > data=sc.parallelize(X,10)
>>> > model = KMeans.train(data, 1000, initializationMode="k-means||")
>>> >
>>> > where complete_dataframe is a pandas data frame that has my data.
>>> >
>>> > I get the error: Py4JNetworkError: An error occurred while trying to
>>> > connect
>>> > to the Java server.
>>> >
>>> > The error  trace is as follows:
>>> >
>>> >> 

[no subject]

2015-06-23 Thread ๏̯͡๏
I have a Spark job that has 7 stages. The first 3 stage complete and the
fourth stage beings (joins two RDDs). This stage has multiple task
 failures all the below exception.

Multiple tasks (100s) of them get the same exception with different hosts.
How can all the host suddenly stop responding when few moments ago 3 stages
ran successfully. If I re-run the three stages will again run successfully.
I cannot think of it being a cluster issue.


Any suggestions ?


Spark Version : 1.3.1

Exception:

org.apache.spark.shuffle.FetchFailedException: Failed to connect to HOST
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$.org$apache$spark$shuffle$hash$BlockStoreShuffleFetcher$$unpackBlock$1(BlockStoreShuffleFetcher.scala:67)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at 
org.apache.spark.shuffle.hash.BlockStoreShuffleFetcher$$anonfun$3.apply(BlockStoreShuffleFetcher.scala:83)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
at 
org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:125)
at org.apache.sp


-- 
Deepak


Re: Un-persist RDD in a loop

2015-06-23 Thread Tom Hubregtsen
I believe that as you are not persisting anything into the memory space
defined by
spark.storage.memoryFraction
you also have nothing to clear from this area using the unpersist. 

FYI: The data will be kept in the OS-buffer/on disk at the point of the
reduce (as this involves a wide dependency -> shuffle of the data), but you
can not clear this through the API.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Un-persist-RDD-in-a-loop-tp23414p23460.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: map V mapPartitions

2015-06-23 Thread Holden Karau
I think one of the primary cases where mapPartitions is useful if you are
going to be doing any setup work that can be re-used between processing
each element, this way the setup work only needs to be done once per
partition (for example creating an instance of jodatime).

Both map and mapPartitions are implemented using the MapPartitionsRDD.

In general if your logic is easily expressed with map, and there isn't any
setup work you are doing that could be shared, using map instead of map
partitions tends to result in more readable code which is valuable in and
off its self.

On Tue, Jun 23, 2015 at 4:57 PM, ÐΞ€ρ@Ҝ (๏̯͡๏)  wrote:

> I know when to use a map () but when should i use mapPartitions() ?
>
> Which is faster ?
>
> --
> Deepak
>
>


-- 
Cell : 425-233-8271
Twitter: https://twitter.com/holdenkarau
Linked In: https://www.linkedin.com/in/holdenkarau


map V mapPartitions

2015-06-23 Thread ๏̯͡๏
I know when to use a map () but when should i use mapPartitions() ?

Which is faster ?

-- 
Deepak


Re: Nested DataFrame(SchemaRDD)

2015-06-23 Thread Michael Armbrust
You can also do this using a sequence of case classes (in the example
stored in a tuple, though the outer container could also be a case class):

case class MyRecord(name: String, location: String)
val df = Seq((1, Seq(MyRecord("Michael", "Berkeley"), MyRecord("Andy",
"Oakland".toDF("id", "people")

df.printSchema

root
|-- id: integer (nullable = false)
|-- people: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- name: string (nullable = true)
| | |-- location: string (nullable = true)

If this dataframe is saved to parquet the nesting will be preserved.

On Tue, Jun 23, 2015 at 4:35 PM, Roberto Congiu 
wrote:

> I wrote a brief howto on building nested records in spark and storing them
> in parquet here:
> http://www.congiu.com/creating-nested-data-parquet-in-spark-sql/
>
> 2015-06-23 16:12 GMT-07:00 Richard Catlin :
>
>> How do I create a DataFrame(SchemaRDD) with a nested array of Rows in a
>> column?  Is there an example?  Will this store as a nested parquet file?
>>
>> Thanks.
>>
>> Richard Catlin
>>
>
>


Re: Nested DataFrame(SchemaRDD)

2015-06-23 Thread Roberto Congiu
I wrote a brief howto on building nested records in spark and storing them
in parquet here:
http://www.congiu.com/creating-nested-data-parquet-in-spark-sql/

2015-06-23 16:12 GMT-07:00 Richard Catlin :

> How do I create a DataFrame(SchemaRDD) with a nested array of Rows in a
> column?  Is there an example?  Will this store as a nested parquet file?
>
> Thanks.
>
> Richard Catlin
>


Re: Difference between Lasso regression in MLlib package and ML package

2015-06-23 Thread Wei Zhou
Thanks DB Tsai, it is very helpful.

Cheers,
Wei

2015-06-23 16:00 GMT-07:00 DB Tsai :

> Please see the current version of code for better documentation.
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
>
> Sincerely,
>
> DB Tsai
> --
> Blog: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Tue, Jun 23, 2015 at 3:58 PM, DB Tsai  wrote:
> > The regularization is handled after the objective function of data is
> > computed. See
> https://github.com/apache/spark/blob/6a827d5d1ec520f129e42c3818fe7d0d870dcbef/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
> >  line 348 for L2.
> >
> > For L1, it's handled by OWLQN, so you don't see it explicitly, but the
> > code is in line 128.
> >
> > Sincerely,
> >
> > DB Tsai
> > --
> > Blog: https://www.dbtsai.com
> > PGP Key ID: 0xAF08DF8D
> >
> >
> > On Tue, Jun 23, 2015 at 3:14 PM, Wei Zhou  wrote:
> >> Hi DB Tsai,
> >>
> >> Thanks for your reply. I went through the source code of
> >> LinearRegression.scala. The algorithm minimizes square error L = 1/2n
> ||A
> >> weights - y||^2^. I cannot match this with the elasticNet loss function
> >> found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html,
> which
> >> is the sum of square error plus L1 and L2 penalty.
> >>
> >> I am able to follow the rest of the mathematical deviation in the code
> >> comment. I am hoping if you could point me to any references that can
> fill
> >> this knowledge gap.
> >>
> >> Best,
> >> Wei
> >>
> >>
> >>
> >> 2015-06-19 12:35 GMT-07:00 DB Tsai :
> >>>
> >>> Hi Wei,
> >>>
> >>> I don't think ML is meant for single node computation, and the
> >>> algorithms in ML are designed for pipeline framework.
> >>>
> >>> In short, the lasso regression in ML is new algorithm implemented from
> >>> scratch, and it's faster, and converged to the same solution as R's
> >>> glmnet but with scalability. Here is the talk I gave in Spark summit
> >>> about the new elastic-net feature in ML. I will encourage you to try
> >>> the one ML.
> >>>
> >>>
> >>>
> http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit
> >>>
> >>> Sincerely,
> >>>
> >>> DB Tsai
> >>> --
> >>> Blog: https://www.dbtsai.com
> >>> PGP Key ID: 0xAF08DF8D
> >>>
> >>>
> >>> On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou 
> wrote:
> >>> > Hi Spark experts,
> >>> >
> >>> > I see lasso regression/ elastic net implementation under both MLLib
> and
> >>> > ML,
> >>> > does anyone know what is the difference between the two
> implementation?
> >>> >
> >>> > In spark summit, one of the keynote speakers mentioned that ML is
> meant
> >>> > for
> >>> > single node computation, could anyone elaborate this?
> >>> >
> >>> > Thanks.
> >>> >
> >>> > Wei
> >>
> >>
>


RE: Nested DataFrame(SchemaRDD)

2015-06-23 Thread Richard Catlin
How do I create a DataFrame(SchemaRDD) with a nested array of Rows in a
column?  Is there an example?  Will this store as a nested parquet file?

Thanks.

Richard Catlin


Re: Difference between Lasso regression in MLlib package and ML package

2015-06-23 Thread DB Tsai
Please see the current version of code for better documentation.
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala

Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Tue, Jun 23, 2015 at 3:58 PM, DB Tsai  wrote:
> The regularization is handled after the objective function of data is
> computed. See 
> https://github.com/apache/spark/blob/6a827d5d1ec520f129e42c3818fe7d0d870dcbef/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
>  line 348 for L2.
>
> For L1, it's handled by OWLQN, so you don't see it explicitly, but the
> code is in line 128.
>
> Sincerely,
>
> DB Tsai
> --
> Blog: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Tue, Jun 23, 2015 at 3:14 PM, Wei Zhou  wrote:
>> Hi DB Tsai,
>>
>> Thanks for your reply. I went through the source code of
>> LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A
>> weights - y||^2^. I cannot match this with the elasticNet loss function
>> found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which
>> is the sum of square error plus L1 and L2 penalty.
>>
>> I am able to follow the rest of the mathematical deviation in the code
>> comment. I am hoping if you could point me to any references that can fill
>> this knowledge gap.
>>
>> Best,
>> Wei
>>
>>
>>
>> 2015-06-19 12:35 GMT-07:00 DB Tsai :
>>>
>>> Hi Wei,
>>>
>>> I don't think ML is meant for single node computation, and the
>>> algorithms in ML are designed for pipeline framework.
>>>
>>> In short, the lasso regression in ML is new algorithm implemented from
>>> scratch, and it's faster, and converged to the same solution as R's
>>> glmnet but with scalability. Here is the talk I gave in Spark summit
>>> about the new elastic-net feature in ML. I will encourage you to try
>>> the one ML.
>>>
>>>
>>> http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit
>>>
>>> Sincerely,
>>>
>>> DB Tsai
>>> --
>>> Blog: https://www.dbtsai.com
>>> PGP Key ID: 0xAF08DF8D
>>>
>>>
>>> On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou  wrote:
>>> > Hi Spark experts,
>>> >
>>> > I see lasso regression/ elastic net implementation under both MLLib and
>>> > ML,
>>> > does anyone know what is the difference between the two implementation?
>>> >
>>> > In spark summit, one of the keynote speakers mentioned that ML is meant
>>> > for
>>> > single node computation, could anyone elaborate this?
>>> >
>>> > Thanks.
>>> >
>>> > Wei
>>
>>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Difference between Lasso regression in MLlib package and ML package

2015-06-23 Thread DB Tsai
The regularization is handled after the objective function of data is
computed. See 
https://github.com/apache/spark/blob/6a827d5d1ec520f129e42c3818fe7d0d870dcbef/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
 line 348 for L2.

For L1, it's handled by OWLQN, so you don't see it explicitly, but the
code is in line 128.

Sincerely,

DB Tsai
--
Blog: https://www.dbtsai.com
PGP Key ID: 0xAF08DF8D


On Tue, Jun 23, 2015 at 3:14 PM, Wei Zhou  wrote:
> Hi DB Tsai,
>
> Thanks for your reply. I went through the source code of
> LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A
> weights - y||^2^. I cannot match this with the elasticNet loss function
> found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which
> is the sum of square error plus L1 and L2 penalty.
>
> I am able to follow the rest of the mathematical deviation in the code
> comment. I am hoping if you could point me to any references that can fill
> this knowledge gap.
>
> Best,
> Wei
>
>
>
> 2015-06-19 12:35 GMT-07:00 DB Tsai :
>>
>> Hi Wei,
>>
>> I don't think ML is meant for single node computation, and the
>> algorithms in ML are designed for pipeline framework.
>>
>> In short, the lasso regression in ML is new algorithm implemented from
>> scratch, and it's faster, and converged to the same solution as R's
>> glmnet but with scalability. Here is the talk I gave in Spark summit
>> about the new elastic-net feature in ML. I will encourage you to try
>> the one ML.
>>
>>
>> http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit
>>
>> Sincerely,
>>
>> DB Tsai
>> --
>> Blog: https://www.dbtsai.com
>> PGP Key ID: 0xAF08DF8D
>>
>>
>> On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou  wrote:
>> > Hi Spark experts,
>> >
>> > I see lasso regression/ elastic net implementation under both MLLib and
>> > ML,
>> > does anyone know what is the difference between the two implementation?
>> >
>> > In spark summit, one of the keynote speakers mentioned that ML is meant
>> > for
>> > single node computation, could anyone elaborate this?
>> >
>> > Thanks.
>> >
>> > Wei
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL vs. DataFrame API

2015-06-23 Thread Davies Liu
If yo change to ```val numbers2 = numbers```,  then it have the same problem

On Tue, Jun 23, 2015 at 2:54 PM, Ignacio Blasco  wrote:
> It seems that it doesn't happen in Scala API. Not exactly the same as in
> python, but pretty close.
>
> https://gist.github.com/elnopintan/675968d2e4be68958df8
>
> 2015-06-23 23:11 GMT+02:00 Davies Liu :
>>
>> I think it also happens in DataFrames API of all languages.
>>
>> On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco 
>> wrote:
>> > That issue happens only in python dsl?
>> >
>> > El 23/6/2015 5:05 p. m., "Bob Corsaro"  escribió:
>> >>
>> >> Thanks! The solution:
>> >>
>> >> https://gist.github.com/dokipen/018a1deeab668efdf455
>> >>
>> >> On Mon, Jun 22, 2015 at 4:33 PM Davies Liu 
>> >> wrote:
>> >>>
>> >>> Right now, we can not figure out which column you referenced in
>> >>> `select`, if there are multiple row with the same name in the joined
>> >>> DataFrame (for example, two `value`).
>> >>>
>> >>> A workaround could be:
>> >>>
>> >>> numbers2 = numbers.select(df.name, df.value.alias('other'))
>> >>> rows = numbers.join(numbers2,
>> >>> (numbers.name==numbers2.name) & (numbers.value !=
>> >>> numbers2.other),
>> >>> how="inner") \
>> >>>   .select(numbers.name, numbers.value, numbers2.other) \
>> >>>   .collect()
>> >>>
>> >>> On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco
>> >>> 
>> >>> wrote:
>> >>> > Sorry thought it was scala/spark
>> >>> >
>> >>> > El 22/6/2015 9:49 p. m., "Bob Corsaro" 
>> >>> > escribió:
>> >>> >>
>> >>> >> That's invalid syntax. I'm pretty sure pyspark is using a DSL to
>> >>> >> create a
>> >>> >> query here and not actually doing an equality operation.
>> >>> >>
>> >>> >> On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco
>> >>> >> 
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Probably you should use === instead of == and !== instead of !=
>> >>> >>>
>> >>> >>> Can anyone explain why the dataframe API doesn't work as I expect
>> >>> >>> it
>> >>> >>> to
>> >>> >>> here? It seems like the column identifiers are getting confused.
>> >>> >>>
>> >>> >>> https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Kafka createDirectStream ​issue

2015-06-23 Thread syepes
yes, I have two clusters one standalone an another using Mesos

 Sebastian YEPES
   http://sebastian-yepes.com

On Wed, Jun 24, 2015 at 12:37 AM, drarse [via Apache Spark User List] <
ml-node+s1001560n23457...@n3.nabble.com> wrote:

> Hi syepes,
> Are u run the application in standalone mode?
> Regards
> El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <[hidden email]
> > escribió:
>
>> Hello,
>>
>> I ​am trying ​use the new Kafka ​consumer
>> ​​"KafkaUtils.createDirectStream"​ but I am having some issues making it
>> work.
>> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363
>> and I am still getting the same strange exception "ClassNotFoundException:
>> $line49.$read$$iwC$$i"
>>
>> Has anyone else been facing this kind of problem?
>>
>> The following is the code and logs that I have been using to reproduce
>> the issue:
>>
>> spark-shell: script
>> --
>> sc.stop()
>> import _root_.kafka.serializer.StringDecoder
>> import org.apache.spark.SparkConf
>> import org.apache.spark.streaming._
>> import org.apache.spark.streaming.kafka.KafkaUtils
>>
>> val sparkConf = new
>> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
>> "4041" ).set("spark.driver.allowMultipleContexts",
>> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
>>
>> val ssc = new StreamingContext(sparkConf, Seconds(5))
>>
>> val kafkaParams = Map[String, String]("bootstrap.servers" ->
>> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
>> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
>> val topic = Set("test")
>> val messages = KafkaUtils.createDirectStream[String, String,
>> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>>
>> val raw = messages.map(_._2)
>> val words = raw.flatMap(_.split(" "))
>> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
>> wordCounts.print()
>>
>> ssc.start()
>> ssc.awaitTermination()
>> --
>>
>>
>> spark-shell: output
>> --
>> sparkConf: org.apache.spark.SparkConf =
>> org.apache.spark.SparkConf@330e37b2
>> ssc: org.apache.spark.streaming.StreamingContext =
>> org.apache.spark.streaming.StreamingContext@28ec9c23
>> kafkaParams: scala.collection.immutable.Map[String,String] =
>> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
>> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
>> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
>> WARN  [main] kafka.utils.VerifiableProperties - Property
>> schema.registry.url is not valid
>> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
>> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
>> raw: org.apache.spark.streaming.dstream.DStream[String] =
>> org.apache.spark.streaming.dstream.MappedDStream@578ce232
>> words: org.apache.spark.streaming.dstream.DStream[String] =
>> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
>> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
>> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
>> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
>> schema.registry.url is not valid
>> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
>> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
>> java.lang.ClassNotFoundException:
>> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at
>> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>>
>> at
>> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
>> at
>> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
>> at
>> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>> at
>> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>> at
>> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>> at
>> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>> ..
>> ..
>> Driver stacktrace:
>> at 
>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>>
>> at
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scal

Re: Kafka createDirectStream ​issue

2015-06-23 Thread drarse
Hi syepes,
Are u run the application in standalone mode?
Regards
El 23/06/2015 22:48, "syepes [via Apache Spark User List]" <
ml-node+s1001560n23456...@n3.nabble.com> escribió:

> Hello,
>
> I ​am trying ​use the new Kafka ​consumer
> ​​"KafkaUtils.createDirectStream"​ but I am having some issues making it
> work.
> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363
> and I am still getting the same strange exception "ClassNotFoundException:
> $line49.$read$$iwC$$i"
>
> Has anyone else been facing this kind of problem?
>
> The following is the code and logs that I have been using to reproduce the
> issue:
>
> spark-shell: script
> --
> sc.stop()
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> val sparkConf = new
> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
> "4041" ).set("spark.driver.allowMultipleContexts",
> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
>
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
> val topic = Set("test")
> val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topic)
>
> val raw = messages.map(_._2)
> val words = raw.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
>
> ssc.start()
> ssc.awaitTermination()
> --
>
>
> spark-shell: output
> --
> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@28ec9c23
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
> WARN  [main] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> messages: org.apache.spark.streaming.dstream.InputDStream[(String,
> String)] = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
> raw: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.MappedDStream@578ce232
> words: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
> java.lang.ClassNotFoundException:
> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
>
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> ..
> ..
> Driver stacktrace:
> at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
>
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
>
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler

Re: Difference between Lasso regression in MLlib package and ML package

2015-06-23 Thread Wei Zhou
Hi DB Tsai,

Thanks for your reply. I went through the source code of
LinearRegression.scala. The algorithm minimizes square error L = 1/2n ||A
weights - y||^2^. I cannot match this with the elasticNet loss function
found here http://web.stanford.edu/~hastie/glmnet/glmnet_alpha.html, which
is the sum of square error plus L1 and L2 penalty.

I am able to follow the rest of the mathematical deviation in the code
comment. I am hoping if you could point me to any references that can fill
this knowledge gap.

Best,
Wei



2015-06-19 12:35 GMT-07:00 DB Tsai :

> Hi Wei,
>
> I don't think ML is meant for single node computation, and the
> algorithms in ML are designed for pipeline framework.
>
> In short, the lasso regression in ML is new algorithm implemented from
> scratch, and it's faster, and converged to the same solution as R's
> glmnet but with scalability. Here is the talk I gave in Spark summit
> about the new elastic-net feature in ML. I will encourage you to try
> the one ML.
>
>
> http://www.slideshare.net/dbtsai/2015-06-largescale-lasso-and-elasticnet-regularized-generalized-linear-models-at-spark-summit
>
> Sincerely,
>
> DB Tsai
> --
> Blog: https://www.dbtsai.com
> PGP Key ID: 0xAF08DF8D
>
>
> On Fri, Jun 19, 2015 at 11:38 AM, Wei Zhou  wrote:
> > Hi Spark experts,
> >
> > I see lasso regression/ elastic net implementation under both MLLib and
> ML,
> > does anyone know what is the difference between the two implementation?
> >
> > In spark summit, one of the keynote speakers mentioned that ML is meant
> for
> > single node computation, could anyone elaborate this?
> >
> > Thanks.
> >
> > Wei
>


Re: SQL vs. DataFrame API

2015-06-23 Thread Ignacio Blasco
It seems that it doesn't happen in Scala API. Not exactly the same as in
python, but pretty close.

https://gist.github.com/elnopintan/675968d2e4be68958df8

2015-06-23 23:11 GMT+02:00 Davies Liu :

> I think it also happens in DataFrames API of all languages.
>
> On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco 
> wrote:
> > That issue happens only in python dsl?
> >
> > El 23/6/2015 5:05 p. m., "Bob Corsaro"  escribió:
> >>
> >> Thanks! The solution:
> >>
> >> https://gist.github.com/dokipen/018a1deeab668efdf455
> >>
> >> On Mon, Jun 22, 2015 at 4:33 PM Davies Liu 
> wrote:
> >>>
> >>> Right now, we can not figure out which column you referenced in
> >>> `select`, if there are multiple row with the same name in the joined
> >>> DataFrame (for example, two `value`).
> >>>
> >>> A workaround could be:
> >>>
> >>> numbers2 = numbers.select(df.name, df.value.alias('other'))
> >>> rows = numbers.join(numbers2,
> >>> (numbers.name==numbers2.name) & (numbers.value !=
> >>> numbers2.other),
> >>> how="inner") \
> >>>   .select(numbers.name, numbers.value, numbers2.other) \
> >>>   .collect()
> >>>
> >>> On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco  >
> >>> wrote:
> >>> > Sorry thought it was scala/spark
> >>> >
> >>> > El 22/6/2015 9:49 p. m., "Bob Corsaro" 
> escribió:
> >>> >>
> >>> >> That's invalid syntax. I'm pretty sure pyspark is using a DSL to
> >>> >> create a
> >>> >> query here and not actually doing an equality operation.
> >>> >>
> >>> >> On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco <
> elnopin...@gmail.com>
> >>> >> wrote:
> >>> >>>
> >>> >>> Probably you should use === instead of == and !== instead of !=
> >>> >>>
> >>> >>> Can anyone explain why the dataframe API doesn't work as I expect
> it
> >>> >>> to
> >>> >>> here? It seems like the column identifiers are getting confused.
> >>> >>>
> >>> >>> https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
>


Re: Accumulators / Accumulables : thread-local, task-local, executor-local ?

2015-06-23 Thread Guillaume Pitel

Hi,

So I've done this "Node-centered accumulator", I've written a small 
piece about it : 
http://blog.guillaume-pitel.fr/2015/06/spark-trick-shot-node-centered-aggregator/


Hope it can help someone

Guillaume



2015-06-18 15:17 GMT+02:00 Guillaume Pitel >:


I was thinking exactly the same. I'm going to try it, It doesn't
really matter if I lose an executor, since its sketch will be
lost, but then reexecuted somewhere else.


I mean that between the action that will update the sketches and the 
action to collect/merge them you can loose an executor. So outside of 
sparks control. But it's probably an acceptable risk.


And anyway, it's an approximate data structure, and what matters
are ratios, not exact values.

I mostly need to take care of the concurrency problem for my sketch.


I think you could do something like:
  - Have this singleton that holds N sketch instances (one for each 
executor core)
  - Inside an operation over partitions (like 
forEachPartition/mapPartitions)
- at the begin you ask the singleton to provide you with one 
instance to use, in a sync. fashion and pick it out from the N 
available instances or mark it as in use
- when the iterator over the partition doesn't have more elements 
then you release this sketch
  - Then you can do something like 
sc.parallelize(Seq(...)).coalesce(numExecutors).map(pickTheSketches).reduce(blabla), 
but you will have to make sure that this will be executed over each 
executor (not sure if a dataset < than executor num, will trigger an 
action on every executor)


Please let me know what you end up doing, sounds interesting :)

Eugen


Guillaume

Yeah thats the problem. There is probably some "perfect" num of
partitions that provides the best balance between partition size
and memory and merge overhead. Though it's not an ideal solution :(

There could be another way but very hacky... for example if you
store one sketch in a singleton per jvm (so per executor). Do a
first pass over your data and update those. Then you trigger some
other dummy operation that will just retrieve the sketches.
Thats kind of a hack but should work.

Note that if you loose an executor in between, then that doesn't
work anymore, probably you could detect it and recompute the
sketches, but it would become over complicated.



2015-06-18 14:27 GMT+02:00 Guillaume Pitel
mailto:guillaume.pi...@exensa.com>>:

Hi,

Thank you for this confirmation.

Coalescing is what we do now. It creates, however, very big
partitions.

Guillaume

Hey,

I am not 100% sure but from my understanding accumulators
are per partition (so per task as its the same) and are sent
back to the driver with the task result and merged. When a
task needs to be run n times (multiple rdds depend on this
one, some partition loss later in the chain etc) then the
accumulator will count n times the values from that task.
So in short I don't think you'd win from using an
accumulator over what you are doing right now.

You could maybe coalesce your rdd to num-executors without a
shuffle and then update the sketches. You should endup with
1 partition per executor thus 1 sketch per executor. You
could then increase the number of threads per task if you
can use the sketches concurrently.

Eugen

2015-06-18 13:36 GMT+02:00 Guillaume Pitel
mailto:guillaume.pi...@exensa.com>>:

Hi,

I'm trying to figure out the smartest way to implement a
global count-min-sketch on accumulators. For now, we are
doing that with RDDs. It works well, but with one sketch
per partition, merging takes too long.

As you probably know, a count-min sketch is a big
mutable array of array of ints. To distribute it, all
sketches must have the same size. Since it can be big,
and since merging is not free, I would like to minimize
the number of sketches and maximize reuse and conccurent
use of the sketches. Ideally, I would like to just have
one sketch per worker.

I think accumulables might be the right structures for
that, but it seems that they are not shared between
executors, or even between tasks.


https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/Accumulators.scala
(289)
/**

* This thread-local map holds per-task copies of
accumulators; it is used to collect the set

* of accumulator updates to send back to the driver
when tasks complete. After tasks complete,

* this map is cleared by `Accumulators.clear()` (see
Executor.sc

Re: How Spark Execute chaining vs no chaining statements

2015-06-23 Thread Richard Marscher
There should be no difference assuming you don't use the intermediately
stored rdd values you are creating for anything else (rdd1, rdd2). In the
first example it still is creating these intermediate rdd objects you are
just using them implicitly and not storing the value.

It's also worth pointing out that Spark is able to pipeline operations
together into stages. That is, it should effectively translate something
like like map(f1).map(f2).map(f3) to map(f1 -> f2 -> f3) in pseudcode, if
you will. Here is a more detailed explanation from one of the committer's
on SO:
http://stackoverflow.com/questions/19340808/spark-single-pipelined-scala-command-better-than-separate-commands

On Tue, Jun 23, 2015 at 5:17 PM, Ashish Soni  wrote:

> Hi All ,
>
> What is difference between below in terms of execution to the cluster with
> 1 or more worker node
>
> rdd.map(...).map(...)...map(..)
>
> vs
>
> val rdd1 = rdd.map(...)
> val rdd2 = rdd1.map(...)
> val rdd3 = rdd2.map(...)
>
> Thanks,
> Ashish
>


Re: spark streaming with kafka jar missing

2015-06-23 Thread Shushant Arora
Thanks a lot. It worked after keeping all versions to same.1.2.0

On Wed, Jun 24, 2015 at 2:24 AM, Tathagata Das  wrote:

> Why are you mixing spark versions between streaming and core??
> Your core is 1.2.0 and streaming is 1.4.0.
>
> On Tue, Jun 23, 2015 at 1:32 PM, Shushant Arora  > wrote:
>
>> It throws exception for WriteAheadLogUtils after excluding core and
>> streaming jar.
>>
>> Exception in thread "main" java.lang.NoClassDefFoundError:
>> org/apache/spark/streaming/util/WriteAheadLogUtils$
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
>> at
>> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
>> at
>> com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
>> at java.lang.reflect.Method.invoke(Method.java:597)
>> at
>> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
>> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
>> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>>
>>
>> pom.xml is :
>>
>> http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
>> http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
>> http://maven.apache.org/POM/4.0.0
>> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>>   4.0.0
>>   
>>   SampleSparkStreamApp
>>   1.0
>>
>>
>> 
>> 
>> org.apache.spark
>> spark-core_2.10
>> 1.2.0
>> provided
>> 
>>  
>>  org.apache.spark
>> spark-streaming-kafka_2.10
>> 1.4.0
>> 
>>  
>> org.apache.spark
>> spark-streaming_2.10
>> provided
>> 1.4.0
>> 
>>  
>>   
>> 
>>   
>>   
>> maven-assembly-plugin
>> 
>>   
>> package
>> 
>>   single
>> 
>>   
>> 
>> 
>>   
>> jar-with-dependencies
>>   
>> 
>>   
>> 
>>   
>>
>> 
>>
>> And when I pass streaming jar using --jar option , it threw
>> same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$.
>>
>> Thanks
>>
>> On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das 
>> wrote:
>>
>>> You must not include spark-core and spark-streaming in the assembly.
>>> They are already present in the installation and the presence of multiple
>>> versions of spark may throw off the classloaders in weird ways. So make the
>>> assembly marking the those dependencies as scope=provided.
>>>
>>>
>>>
>>> On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 hi

 While using spark streaming (1.2) with kafka . I am getting below error
 and receivers are getting killed but jobs get scheduled at each stream
 interval.

 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
 82, ip(XX)): java.io.IOException: Failed to connect to ip()
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
 at
 org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
 at
 org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
 at
 org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
 at
 java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
 at java.util.concurrent.FutureTask.run(FutureTask.java:262)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
 at java.lang.Thread.run(Thread.java:744)


 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for
 stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
 org/apache/spark/util/ThreadUtils$
 at
 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
 at
 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
 at
 org.apache.spark.streaming.receiver.Rec

How Spark Execute chaining vs no chaining statements

2015-06-23 Thread Ashish Soni
Hi All ,

What is difference between below in terms of execution to the cluster with
1 or more worker node

rdd.map(...).map(...)...map(..)

vs

val rdd1 = rdd.map(...)
val rdd2 = rdd1.map(...)
val rdd3 = rdd2.map(...)

Thanks,
Ashish


Re: SQL vs. DataFrame API

2015-06-23 Thread Davies Liu
I think it also happens in DataFrames API of all languages.

On Tue, Jun 23, 2015 at 9:16 AM, Ignacio Blasco  wrote:
> That issue happens only in python dsl?
>
> El 23/6/2015 5:05 p. m., "Bob Corsaro"  escribió:
>>
>> Thanks! The solution:
>>
>> https://gist.github.com/dokipen/018a1deeab668efdf455
>>
>> On Mon, Jun 22, 2015 at 4:33 PM Davies Liu  wrote:
>>>
>>> Right now, we can not figure out which column you referenced in
>>> `select`, if there are multiple row with the same name in the joined
>>> DataFrame (for example, two `value`).
>>>
>>> A workaround could be:
>>>
>>> numbers2 = numbers.select(df.name, df.value.alias('other'))
>>> rows = numbers.join(numbers2,
>>> (numbers.name==numbers2.name) & (numbers.value !=
>>> numbers2.other),
>>> how="inner") \
>>>   .select(numbers.name, numbers.value, numbers2.other) \
>>>   .collect()
>>>
>>> On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco 
>>> wrote:
>>> > Sorry thought it was scala/spark
>>> >
>>> > El 22/6/2015 9:49 p. m., "Bob Corsaro"  escribió:
>>> >>
>>> >> That's invalid syntax. I'm pretty sure pyspark is using a DSL to
>>> >> create a
>>> >> query here and not actually doing an equality operation.
>>> >>
>>> >> On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco 
>>> >> wrote:
>>> >>>
>>> >>> Probably you should use === instead of == and !== instead of !=
>>> >>>
>>> >>> Can anyone explain why the dataframe API doesn't work as I expect it
>>> >>> to
>>> >>> here? It seems like the column identifiers are getting confused.
>>> >>>
>>> >>> https://gist.github.com/dokipen/4b324a7365ae87b7b0e5

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark standalone cluster - resource management

2015-06-23 Thread Nizan Grauer
I'm having 30G per machine

This is the first (and only) job I'm trying to submit. So it's weird that
for --total-executor-cores=20 it works, and for --total-executor-cores=4 it
doesn't

On Tue, Jun 23, 2015 at 10:46 PM, Igor Berman  wrote:

> probably there are already running jobs there
> in addition, memory is also a resource, so if you are running 1
> application that took all your memory and then you are trying to run
> another application that asks
> for the memory the cluster doesn't have then the second app wont be running
>
> so why are u specifying 22g as executor memory? how much memory you have
> for each machine?
>
> On 23 June 2015 at 09:33, nizang  wrote:
>
>> to give a bit more data on what I'm trying to get -
>>
>> I have many tasks I want to run in parallel, so I want each task to catch
>> small part of the cluster (-> only limited part of my 20 cores in the
>> cluster)
>>
>> I have important tasks that I want them to get 10 cores, and I have small
>> tasks that I want to run with only 1 or 2 cores)
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: spark streaming with kafka jar missing

2015-06-23 Thread Tathagata Das
Why are you mixing spark versions between streaming and core??
Your core is 1.2.0 and streaming is 1.4.0.

On Tue, Jun 23, 2015 at 1:32 PM, Shushant Arora 
wrote:

> It throws exception for WriteAheadLogUtils after excluding core and
> streaming jar.
>
> Exception in thread "main" java.lang.NoClassDefFoundError:
> org/apache/spark/streaming/util/WriteAheadLogUtils$
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65)
> at
> org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
> at
> org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
> at
> com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
> at java.lang.reflect.Method.invoke(Method.java:597)
> at
> org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
>
> pom.xml is :
>
> http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
> http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
> http://maven.apache.org/POM/4.0.0
> http://maven.apache.org/xsd/maven-4.0.0.xsd";>
>   4.0.0
>   
>   SampleSparkStreamApp
>   1.0
>
>
> 
> 
> org.apache.spark
> spark-core_2.10
> 1.2.0
> provided
> 
>  
>  org.apache.spark
> spark-streaming-kafka_2.10
> 1.4.0
> 
>  
> org.apache.spark
> spark-streaming_2.10
> provided
> 1.4.0
> 
>  
>   
> 
>   
>   
> maven-assembly-plugin
> 
>   
> package
> 
>   single
> 
>   
> 
> 
>   
> jar-with-dependencies
>   
> 
>   
> 
>   
>
> 
>
> And when I pass streaming jar using --jar option , it threw
> same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$.
>
> Thanks
>
> On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das 
> wrote:
>
>> You must not include spark-core and spark-streaming in the assembly. They
>> are already present in the installation and the presence of multiple
>> versions of spark may throw off the classloaders in weird ways. So make the
>> assembly marking the those dependencies as scope=provided.
>>
>>
>>
>> On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> hi
>>>
>>> While using spark streaming (1.2) with kafka . I am getting below error
>>> and receivers are getting killed but jobs get scheduled at each stream
>>> interval.
>>>
>>> 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
>>> 82, ip(XX)): java.io.IOException: Failed to connect to ip()
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>>> at
>>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>>> at
>>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>>> at
>>> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>>> at
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>> at
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>> at java.lang.Thread.run(Thread.java:744)
>>>
>>>
>>> 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for
>>> stream 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>>> org/apache/spark/util/ThreadUtils$
>>> at
>>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
>>> at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>>> at
>>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>>> at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>>> at
>>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLaunch

Re: Kafka createDirectStream ​issue

2015-06-23 Thread Cody Koeninger
The exception $line49 is referring to a line of the spark shell.

Have you tried it from an actual assembled job with spark-submit ?

On Tue, Jun 23, 2015 at 3:48 PM, syepes  wrote:

> Hello,
>
> I ​am trying ​use the new Kafka ​consumer
> ​​"KafkaUtils.createDirectStream"​
> but I am having some issues making it work.
> I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and
> I am still getting the same strange exception "ClassNotFoundException:
> $line49.$read$$iwC$$i"
>
> Has anyone else been facing this kind of problem?
>
> The following is the code and logs that I have been using to reproduce the
> issue:
>
> spark-shell: script
> --
> sc.stop()
> import _root_.kafka.serializer.StringDecoder
> import org.apache.spark.SparkConf
> import org.apache.spark.streaming._
> import org.apache.spark.streaming.kafka.KafkaUtils
>
> val sparkConf = new
>
> SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
> "4041" ).set("spark.driver.allowMultipleContexts",
>
> "true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
> val ssc = new StreamingContext(sparkConf, Seconds(5))
>
> val kafkaParams = Map[String, String]("bootstrap.servers" ->
> "localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
> "zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
> val topic = Set("test")
> val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
> StringDecoder](ssc, kafkaParams, topic)
>
> val raw = messages.map(_._2)
> val words = raw.flatMap(_.split(" "))
> val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
> wordCounts.print()
>
> ssc.start()
> ssc.awaitTermination()
> --
>
>
> spark-shell: output
> --
> sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
> ssc: org.apache.spark.streaming.StreamingContext =
> org.apache.spark.streaming.StreamingContext@28ec9c23
> kafkaParams: scala.collection.immutable.Map[String,String] =
> Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
> http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
> OPC)topic: scala.collection.immutable.Set[String] = Set(test)
> WARN  [main] kafka.utils.VerifiableProperties - Property
> schema.registry.url
> is not valid
> messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
> = org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
> raw: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.MappedDStream@578ce232
> words: org.apache.spark.streaming.dstream.DStream[String] =
> org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
> wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
> org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
> WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
> schema.registry.url is not valid
> WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
> Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
> java.lang.ClassNotFoundException:
>
> $line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:348)
> at
>
> org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
> at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
> at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> ..
> ..
> Driver stacktrace:
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortSta

Kafka createDirectStream ​issue

2015-06-23 Thread syepes
Hello,

I ​am trying ​use the new Kafka ​consumer ​​"KafkaUtils.createDirectStream"​
but I am having some issues making it work.
I have tried different versions of Spark v1.4.0 and branch-1.4 #8d6e363 and
I am still getting the same strange exception "ClassNotFoundException:
$line49.$read$$iwC$$i"

Has anyone else been facing this kind of problem?

The following is the code and logs that I have been using to reproduce the
issue:

spark-shell: script
--
sc.stop()
import _root_.kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka.KafkaUtils

val sparkConf = new
SparkConf().setMaster("spark://localhost:7077").setAppName("KCon").set("spark.ui.port",
"4041" ).set("spark.driver.allowMultipleContexts",
"true").setJars(Array("/opt/spark-libs/spark-streaming-kafka-assembly_2.10-1.4.2-SNAPSHOT.jar"))
val ssc = new StreamingContext(sparkConf, Seconds(5))

val kafkaParams = Map[String, String]("bootstrap.servers" ->
"localhost:9092", "schema.registry.url" -> "http://localhost:8081";,
"zookeeper.connect" -> "localhost:2181", "group.id" -> "KCon" )
val topic = Set("test")
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,
StringDecoder](ssc, kafkaParams, topic)

val raw = messages.map(_._2)
val words = raw.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()

ssc.start()
ssc.awaitTermination()
--


spark-shell: output
--
sparkConf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@330e37b2
ssc: org.apache.spark.streaming.StreamingContext =
org.apache.spark.streaming.StreamingContext@28ec9c23
kafkaParams: scala.collection.immutable.Map[String,String] =
Map(bootstrap.servers -> localhost:9092, schema.registry.url ->
http://localhost:8081, zookeeper.connect -> localhost:2181, group.id ->
OPC)topic: scala.collection.immutable.Set[String] = Set(test)
WARN  [main] kafka.utils.VerifiableProperties - Property schema.registry.url
is not valid
messages: org.apache.spark.streaming.dstream.InputDStream[(String, String)]
= org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1e71b70d
raw: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.MappedDStream@578ce232
words: org.apache.spark.streaming.dstream.DStream[String] =
org.apache.spark.streaming.dstream.FlatMappedDStream@351cc4b5
wordCounts: org.apache.spark.streaming.dstream.DStream[(String, Long)] =
org.apache.spark.streaming.dstream.ShuffledDStream@ae04104
WARN  [JobGenerator] kafka.utils.VerifiableProperties - Property
schema.registry.url is not valid
WARN  [task-result-getter-0] org.apache.spark.scheduler.TaskSetManager -
Lost task 0.0 in stage 0.0 (TID 0, 10.3.30.87):
java.lang.ClassNotFoundException:
$line49.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:66)
at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
..
..
Driver stacktrace:
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1266)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1257)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1256)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1256)
--


Best regards and thanks in advance for any help.
  



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Kafka-createDirectStream-issue-tp23456.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To uns

Re: spark streaming with kafka jar missing

2015-06-23 Thread Shushant Arora
It throws exception for WriteAheadLogUtils after excluding core and
streaming jar.

Exception in thread "main" java.lang.NoClassDefFoundError:
org/apache/spark/streaming/util/WriteAheadLogUtils$
at
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:84)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:65)
at
org.apache.spark.streaming.kafka.KafkaUtils$.createStream(KafkaUtils.scala:103)
at
org.apache.spark.streaming.kafka.KafkaUtils.createStream(KafkaUtils.scala)
at
com.adobe.hadoop.saprk.sample.SampleSparkStreamApp.main(SampleSparkStreamApp.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at
org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)


pom.xml is :

http://maven.apache.org/POM/4.0.0"; xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="
http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
  4.0.0
  
  SampleSparkStreamApp
  1.0




org.apache.spark
spark-core_2.10
1.2.0
provided

 
 org.apache.spark
spark-streaming-kafka_2.10
1.4.0

 
org.apache.spark
spark-streaming_2.10
provided
1.4.0

 
  

  
  
maven-assembly-plugin

  
package

  single

  


  
jar-with-dependencies
  

  

  



And when I pass streaming jar using --jar option , it threw
same java.lang.NoClassDefFoundError: org/apache/spark/util/ThreadUtils$.

Thanks

On Wed, Jun 24, 2015 at 1:17 AM, Tathagata Das  wrote:

> You must not include spark-core and spark-streaming in the assembly. They
> are already present in the installation and the presence of multiple
> versions of spark may throw off the classloaders in weird ways. So make the
> assembly marking the those dependencies as scope=provided.
>
>
>
> On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> hi
>>
>> While using spark streaming (1.2) with kafka . I am getting below error
>> and receivers are getting killed but jobs get scheduled at each stream
>> interval.
>>
>> 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
>> 82, ip(XX)): java.io.IOException: Failed to connect to ip()
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
>> at
>> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
>> at
>> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
>> at
>> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
>> at
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:744)
>>
>>
>> 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream
>> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
>> org/apache/spark/util/ThreadUtils$
>> at
>> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
>> at
>> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
>> at
>> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
>> at
>> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
>> at
>> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>> at org.apache.spark.scheduler.Task.run(Task.scala:56

Re: Calculating tuple count /input rate with time

2015-06-23 Thread Tathagata Das
This should give accurate count for each batch, though for getting the rate
you have to make sure that you streaming app is stable, that is, batches
are processed as fast as they are received (scheduling delay in the spark
streaming UI is approx 0).

TD

On Tue, Jun 23, 2015 at 2:49 AM, anshu shukla 
wrote:

> I am calculating input rate using the following logic.
>
> And i think this foreachRDD is always running on driver (println are seen on 
> driver)
>
> 1- Is there any other way to do that in less cost .
>
> 2- Will this give me the correct count for rate  .
>
>
> //code -
>
> inputStream.foreachRDD(new Function, Void>() {
> @Override
> public Void call(JavaRDD stringJavaRDD) throws Exception {
> System.out.println(System.currentTimeMillis()+",spoutstringJavaRDD," 
> + stringJavaRDD.count() );
> return null;
> }
> });
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


kafka spark streaming with mesos

2015-06-23 Thread Bartek Radziszewski
Hey,

I’m trying to run kafka spark streaming using mesos with following example:

sc.stop
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext._
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.storage.StorageLevel
val sparkConf = new 
SparkConf().setAppName("Summarizer").setMaster("zk://127.0.0.1:2181/mesos")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val kafkaParams = Map[String, String]("zookeeper.connect" -> "127.0.0.1:2181", 
"group.id" -> "test")
val messages = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, kafkaParams, Map("test" -> 1), 
StorageLevel.MEMORY_ONLY_SER).map(_._2)

messages.foreachRDD { pairRDD =>
println(s"DataListener.listen() [pairRDD = ${pairRDD}]")
println(s"DataListener.listen() [pairRDD.count = ${pairRDD.count()}]")
pairRDD.foreach(row => println(s"DataListener.listen() [row = ${row}]"))
val msgData = pairRDD.collect()
}

Unfortunately println(s"DataListener.listen() [pairRDD.count = 
${pairRDD.count()}]”) returning always 0

I tested same example but using “local[2]” instead of 
"zk://127.0.0.1:2181/mesos” and all working perfect (count return correct 
produced message count, and pairRDD.foreach(row => 
println(s"DataListener.listen() [row = ${row}]”)) returning kafka msg.

Could you help me to understand that issue? what i’m going wrong?

attaching:
spark shell output http://pastebin.com/zdYFBj4T 
executor output http://pastebin.com/LDMtCjq0 

thanks!
bartek

Re: java.lang.IllegalArgumentException: A metric named ... already exists

2015-06-23 Thread Tathagata Das
Aaah this could be potentially major issue as it may prevent metrics from
restarted streaming context be not published. Can you make it a JIRA.

TD

On Tue, Jun 23, 2015 at 7:59 AM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> I'm running a program in Spark 1.4 where several Spark Streaming contexts
> are created from the same Spark context. As pointed in
> https://spark.apache.org/docs/latest/streaming-programming-guide.html
> each Spark Streaming context is stopped before creating the next Spark
> Streaming context. The program works ok, but I get exceptions like the
> following when a new Spark Streaming context is created
>
> 15/06/23 16:34:51 INFO MetricsSystem: Metrics already registered
> java.lang.IllegalArgumentException: A metric named
> local-1435070090627.driver.SampleStreamingTest.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime
> already exists
> at
> com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
> at
> com.codahale.metrics.MetricRegistry.registerAll(MetricRegistry.java:385)
> at
> com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:85)
>
>
> Is this something to be concerned, or just a minor nuisance?
>
> Thanks a lot in advance.
>
> Greetings,
>
> Juan Rodriguez Hortala
>


Re: spark streaming with kafka jar missing

2015-06-23 Thread Tathagata Das
You must not include spark-core and spark-streaming in the assembly. They
are already present in the installation and the presence of multiple
versions of spark may throw off the classloaders in weird ways. So make the
assembly marking the those dependencies as scope=provided.



On Tue, Jun 23, 2015 at 11:56 AM, Shushant Arora 
wrote:

> hi
>
> While using spark streaming (1.2) with kafka . I am getting below error
> and receivers are getting killed but jobs get scheduled at each stream
> interval.
>
> 15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID
> 82, ip(XX)): java.io.IOException: Failed to connect to ip()
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
> at
> org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
> at
> org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
> at
> org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
> at java.util.concurrent.FutureTask.run(FutureTask.java:262)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
> 15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream
> 0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
> org/apache/spark/util/ThreadUtils$
> at
> org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
> at
> org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
> at
> org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
> at
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:744)
>
>
> I created jar with include all dependencies. Which jar is missing here ?
>
>
>
>
>


When to use underlying data management layer versus standalone Spark?

2015-06-23 Thread commtech
Hi,

I work at a large financial institution in New York. We're looking into
Spark and trying to learn more about the deployment/use cases for real-time
analytics with Spark. When would it be better to deploy standalone Spark
versus Spark on top of a more comprehensive data management layer (Hadoop,
Cassandra, MongoDB, etc.)? If you do deploy on top of one of these, are
there different use cases where one of these database management layers are
better or worse?

Any color would be very helpful. Thank you in advance.

Sincerely,
Michael





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/When-to-use-underlying-data-management-layer-versus-standalone-Spark-tp23455.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark standalone cluster - resource management

2015-06-23 Thread Igor Berman
probably there are already running jobs there
in addition, memory is also a resource, so if you are running 1 application
that took all your memory and then you are trying to run another
application that asks
for the memory the cluster doesn't have then the second app wont be running

so why are u specifying 22g as executor memory? how much memory you have
for each machine?

On 23 June 2015 at 09:33, nizang  wrote:

> to give a bit more data on what I'm trying to get -
>
> I have many tasks I want to run in parallel, so I want each task to catch
> small part of the cluster (-> only limited part of my 20 cores in the
> cluster)
>
> I have important tasks that I want them to get 10 cores, and I have small
> tasks that I want to run with only 1 or 2 cores)
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-standalone-cluster-resource-management-tp23444p23445.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Registering custom metrics

2015-06-23 Thread Otis Gospodnetić
Hi,

Not sure if this will fit your needs, but if you are trying to
collect+chart some metrics specific to your app, yet want to correlate them
with what's going on in Spark, maybe Spark's performance numbers, you may
want to send your custom metrics to SPM, so they can be
visualized/analyzed/"dashboarded" along with your Spark metrics. See
http://sematext.com/spm/integrations/spark-monitoring.html for the Spark
piece and https://sematext.atlassian.net/wiki/display/PUBSPM/Custom+Metrics
for Custom Metrics.  If you use Coda Hale's metrics lib, that works, too,
there is a pluggable reported that will send Coda Hale metrics to SPM, too.

HTH.

Otis
--
Monitoring * Alerting * Anomaly Detection * Centralized Log Management
Solr & Elasticsearch Support * http://sematext.com/


On Mon, Jun 22, 2015 at 9:57 AM, dgoldenberg 
wrote:

> Hi Gerard,
>
> Have there been any responses? Any insights as to what you ended up doing
> to
> enable custom metrics? I'm thinking of implementing a custom metrics sink,
> not sure how doable that is yet...
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Registering-custom-metrics-tp17765p23426.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


spark streaming with kafka jar missing

2015-06-23 Thread Shushant Arora
hi

While using spark streaming (1.2) with kafka . I am getting below error
and receivers are getting killed but jobs get scheduled at each stream
interval.

15/06/23 18:42:35 WARN TaskSetManager: Lost task 0.1 in stage 18.0 (TID 82,
ip(XX)): java.io.IOException: Failed to connect to ip()
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:191)
at
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:156)
at
org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:78)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
at
org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


15/06/23 18:42:36 ERROR ReceiverTracker: Deregistered receiver for stream
0: Error starting receiver 0 - java.lang.NoClassDefFoundError:
org/apache/spark/util/ThreadUtils$
at
org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:115)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:121)
at
org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:106)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:277)
at
org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverLauncher$$anonfun$8.apply(ReceiverTracker.scala:269)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
at
org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1319)
at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:56)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)


I created jar with include all dependencies. Which jar is missing here ?


Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Tathagata Das
Yes, this is a known behavior. Some static stuff are not serialized as part
of a task.

On Tue, Jun 23, 2015 at 10:24 AM, Nipun Arora 
wrote:

> I found the error so just posting on the list.
>
> It seems broadcast variables cannot be declared static.
> If you do you get a null pointer exception.
>
> Thanks
> Nipun
>
> On Tue, Jun 23, 2015 at 11:08 AM, Nipun Arora 
> wrote:
>
>> btw. just for reference I have added the code in a gist:
>>
>> https://gist.github.com/nipunarora/ed987e45028250248edc
>>
>> and a stackoverflow reference here:
>>
>>
>> http://stackoverflow.com/questions/31006490/broadcast-variable-null-pointer-exception-in-spark-streaming
>>
>> On Tue, Jun 23, 2015 at 11:01 AM, Nipun Arora 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a spark streaming application where I need to access a model
>>> saved in a HashMap.
>>> I have *no problems in running the same code with broadcast variables
>>> in the local installation.* However I get a *null pointer* *exception*
>>> when I deploy it on my spark test cluster.
>>>
>>>
>>> I have stored a model in a HashMap which is
>>> serializable. I use a broadcast variables declared as a global static
>>> variable to broadcast this hashmap:
>>>
>>> public static Broadcast> br;
>>>
>>> HashMap hm = checkerObj.getModel(esserver, type);
>>>
>>> br = ssc.sparkContext().broadcast(hm);
>>>
>>>
>>> I need to access this model in my mapper phase, and do some operation
>>> based on the checkup. The following is a snippet of how I access the
>>> broadcast variable.
>>>
>>>
>>> JavaDStream> split = matched.map(new 
>>> GenerateType2Scores());
>>>
>>>
>>> class GenerateType2Scores implements Function>> String>> {
>>> @Override
>>> public Tuple3 call(String s) throws Exception{
>>>
>>> Long time = Type2ViolationChecker.getMTS(s);
>>> HashMap temphm= Type2ViolationChecker.br.value();
>>>
>>> Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
>>> return new Tuple3(time,score, s);}
>>> }
>>>
>>> The temphm should refer to the hashmap stored in the broadcast variable.
>>> Can anyone help me understand what is the correct way to access
>>> broadcast variables in JAVA?
>>>
>>> Thanks
>>> Nipun
>>>
>>
>>
>


Can Spark1.4 work with CDH4.6

2015-06-23 Thread Yana Kadiyska
Hi folks, I have been using Spark against an external Metastore service
which runs Hive with Cdh 4.6

In Spark 1.2, I was able to successfully connect by building with the
following:

./make-distribution.sh --tgz -Dhadoop.version=2.0.0-mr1-cdh4.2.0
-Phive-thriftserver -Phive-0.12.0

I see that in Spark 1.4 the Hive 0.12.0 profile is deprecated in favor of
spark.sql.hive.metastore.version/spark.sql.hive.metastore.jars

When I tried to use this setup spark-shell fails for me with the following
error:

15/06/23 18:18:07 INFO hive.HiveContext: Initializing
HiveMetastoreConnection version 0.12.0 using [Ljava.net.URL;@7b7a9a6c
java.lang.ClassNotFoundException: java.lang.NoClassDefFoundError:
com/google/common/base/Preconditions when creating Hive client using
classpath: file:/hive/lib/guava-11.0.2.jar,
file:/hive/lib/hive-exec-0.10.0-cdh4.6.0.jar,
file:/hive/lib/hive-metastore-0.10.0-cdh4.6.0.jar,
file:/hadoop/share/hadoop/mapreduce1/lib/hadoop-common-2.0.0-cdh4.6.0.jar,
file:/hive/lib/commons-logging-1.0.4.jar

​

I don't know why it's not seeing the class -- it's in the guava jar. If
anyone has had success with 0.12 version please let me know what jars need
to be on the classpath. I think my Hive version might be too outdated but I
don't control the metastore and I had success with Spark1.2 so I'm hoping...


Re: org.apache.spark.sql.ScalaReflectionLock

2015-06-23 Thread Josh Rosen
Mind filing a JIRA?

On Tue, Jun 23, 2015 at 9:34 AM, Koert Kuipers  wrote:

> just a heads up, i was doing some basic coding using DataFrame, Row,
> StructType, etc. and i ended up with deadlocks in my sbt tests due to the
> usage of
> ScalaReflectionLock.synchronized in the spark sql code.
> the issue away when i changed my tests to run consecutively...
>
>


Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Nipun Arora
I found the error so just posting on the list.

It seems broadcast variables cannot be declared static.
If you do you get a null pointer exception.

Thanks
Nipun

On Tue, Jun 23, 2015 at 11:08 AM, Nipun Arora 
wrote:

> btw. just for reference I have added the code in a gist:
>
> https://gist.github.com/nipunarora/ed987e45028250248edc
>
> and a stackoverflow reference here:
>
>
> http://stackoverflow.com/questions/31006490/broadcast-variable-null-pointer-exception-in-spark-streaming
>
> On Tue, Jun 23, 2015 at 11:01 AM, Nipun Arora 
> wrote:
>
>> Hi,
>>
>> I have a spark streaming application where I need to access a model saved
>> in a HashMap.
>> I have *no problems in running the same code with broadcast variables in
>> the local installation.* However I get a *null pointer* *exception* when
>> I deploy it on my spark test cluster.
>>
>>
>> I have stored a model in a HashMap which is
>> serializable. I use a broadcast variables declared as a global static
>> variable to broadcast this hashmap:
>>
>> public static Broadcast> br;
>>
>> HashMap hm = checkerObj.getModel(esserver, type);
>>
>> br = ssc.sparkContext().broadcast(hm);
>>
>>
>> I need to access this model in my mapper phase, and do some operation
>> based on the checkup. The following is a snippet of how I access the
>> broadcast variable.
>>
>>
>> JavaDStream> split = matched.map(new 
>> GenerateType2Scores());
>>
>>
>> class GenerateType2Scores implements Function> String>> {
>> @Override
>> public Tuple3 call(String s) throws Exception{
>>
>> Long time = Type2ViolationChecker.getMTS(s);
>> HashMap temphm= Type2ViolationChecker.br.value();
>>
>> Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
>> return new Tuple3(time,score, s);}
>> }
>>
>> The temphm should refer to the hashmap stored in the broadcast variable.
>> Can anyone help me understand what is the correct way to access broadcast
>> variables in JAVA?
>>
>> Thanks
>> Nipun
>>
>
>


SPARK-8566

2015-06-23 Thread Eric Friedman
I logged this Jira this morning:
https://issues.apache.org/jira/browse/SPARK-8566

I'm curious if any of the cognoscenti can advise as to a likely cause of
the problem?


Re: Limitations using SparkContext

2015-06-23 Thread Richard Marscher
Hi,

can you detail the symptom further? Was it that only 12 requests were
services and the other 440 timed out? I don't think that Spark is well
suited for this kind of workload, or at least the way it is being
represented. How long does a single request take Spark to complete?

Even with fair scheduling, you will only be able to have a fixed amount of
tasks running on Spark at once. Usually this is bounded by the max cores
setting in configuration. Since you mention local as a comparison point I
get the impression you are running Spark Standalone for cluster. The
implication, if this is reflective of your current setup, is that you
aren't going to get much concurrency for separate spray requests. lets say
your max cores is 16 and your number of tasks/partitions per stage of your
spark DAG is 8. Then at any given time only 2 requests can be serviced. It
may also be the case that with fair scheduling that a single request gets
pre-empted after completing one stage of the DAG and has to wait to
continue instead of proceeding directly to the next stage.

This hypothesis would also support the observation that local is no better
than cluster, because you probably have even less concurrent spark tasks
available on the single local machine.


spark.cores.max(not set)When running on a standalone deploy cluster
 or a Mesos
cluster in "coarse-grained" sharing mode
,
the maximum amount of CPU cores to request for the application from across
the cluster (not from each machine). If not set, the default will be
spark.deploy.defaultCores on Spark's standalone cluster manager, or
infinite (all available cores) on Mesos.

On Tue, Jun 23, 2015 at 12:44 PM, daunnc  wrote:

> So the situation is following: got a spray server, with a spark context
> available (fair scheduling in a cluster mode, via spark-submit). There are
> some http urls, which calling spark rdd, and collecting information from
> accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of
> limitation, on requests:
>
> wrk -t8 -c50 -d30s "http://localhost:/…/";
> Running 30s test @ http://localhost:/…/
>   8 threads and 50 connections
>   Thread Stats   Avg  Stdev Max   +/- Stdev
> Latency 1.03s   523.30ms   1.70s50.00%
> Req/Sec 6.05  5.4920.00 71.58%
>   452 requests in 30.04s, 234.39KB read
>   Socket errors: connect 0, read 0, write 0, timeout 440
>
> So this happens on making some calls with spark rdd (not depends on called
> function), and in browser you can see ERR_EMPTY_RESPONSE
>
> Now the solution was to use cache, but want to know about this limitations,
> or mb some settings.
> This error happens in local mode and in cluster mode, so guess not depends
> on it.
>
> P.S. logs are clear (or simply don't know where to look, but stdout of a
> spar-submit in a client mode is clear).
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Limitations using SparkContext

2015-06-23 Thread daunnc
So the situation is following: got a spray server, with a spark context
available (fair scheduling in a cluster mode, via spark-submit). There are
some http urls, which calling spark rdd, and collecting information from
accumulo / hdfs / etc (using rdd). Noticed, that there is a sort of
limitation, on requests: 

wrk -t8 -c50 -d30s "http://localhost:/…/";
Running 30s test @ http://localhost:/…/
  8 threads and 50 connections
  Thread Stats   Avg  Stdev Max   +/- Stdev
Latency 1.03s   523.30ms   1.70s50.00%
Req/Sec 6.05  5.4920.00 71.58%
  452 requests in 30.04s, 234.39KB read
  Socket errors: connect 0, read 0, write 0, timeout 440

So this happens on making some calls with spark rdd (not depends on called
function), and in browser you can see ERR_EMPTY_RESPONSE

Now the solution was to use cache, but want to know about this limitations,
or mb some settings.
This error happens in local mode and in cluster mode, so guess not depends
on it.

P.S. logs are clear (or simply don't know where to look, but stdout of a
spar-submit in a client mode is clear). 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Limitations-using-SparkContext-tp23452.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



org.apache.spark.sql.ScalaReflectionLock

2015-06-23 Thread Koert Kuipers
just a heads up, i was doing some basic coding using DataFrame, Row,
StructType, etc. and i ended up with deadlocks in my sbt tests due to the
usage of
ScalaReflectionLock.synchronized in the spark sql code.
the issue away when i changed my tests to run consecutively...


Re: Help optimising Spark SQL query

2015-06-23 Thread Sabarish Sasidharan
64GB in parquet could be many billions of rows because of the columnar
compression. And count distinct by itself is an expensive operation. This
is not just on Spark, even on Presto/Impala, you would see performance dip
with count distincts. And the cluster is not that powerful either.

The one issue here is that Spark has to sift through all the data to get to
just a week's worth. To achieve better performance you might want to
partition the data by date/week and then Spark wouldn't have to sift
through all the billions of rows to get to the millions it needs to
aggregate.

Regards
Sab

On Tue, Jun 23, 2015 at 4:35 PM, James Aley  wrote:

> Thanks for the suggestions everyone, appreciate the advice.
>
> I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead
> of 1.3, replacing the date casts with a "between" operation on the
> corresponding long constants instead and changing COUNT(*) to COUNT(1).
> None of these seem to have made any remarkable difference in running time
> for the query.
>
> I'll hook up YourKit and see if we can figure out where the CPU time is
> going, then post back.
>
> On 22 June 2015 at 16:01, Yin Huai  wrote:
>
>> Hi James,
>>
>> Maybe it's the DISTINCT causing the issue.
>>
>> I rewrote the query as follows. Maybe this one can finish faster.
>>
>> select
>>   sum(cnt) as uses,
>>   count(id) as users
>> from (
>>   select
>> count(*) cnt,
>> cast(id as string) as id,
>>   from usage_events
>>   where
>> from_unixtime(cast(timestamp_millis/1000 as bigint)) between
>> '2015-06-09' and '2015-06-16'
>>   group by cast(id as string)
>> ) tmp
>>
>> Thanks,
>>
>> Yin
>>
>> On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke 
>> wrote:
>>
>>> Generally (not only spark sql specific) you should not cast in the where
>>> part of a sql query. It is also not necessary in your case. Getting rid of
>>> casts in the whole query will be also beneficial.
>>>
>>> Le lun. 22 juin 2015 à 17:29, James Aley  a
>>> écrit :
>>>
 Hello,

 A colleague of mine ran the following Spark SQL query:

 select
   count(*) as uses,
   count (distinct cast(id as string)) as users
 from usage_events
 where
   from_unixtime(cast(timestamp_millis/1000 as bigint))
 between '2015-06-09' and '2015-06-16'

 The table contains billions of rows, but totals only 64GB of data
 across ~30 separate files, which are stored as Parquet with LZO compression
 in S3.

 From the referenced columns:

 * id is Binary, which we cast to a String so that we can DISTINCT by
 it. (I was already told this will improve in a later release, in a separate
 thread.)
 * timestamp_millis is a long, containing a unix timestamp with
 millisecond resolution

 This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
 instances, using 20 executors, each with 4GB memory. I can see from
 monitoring tools that the CPU usage is at 100% on all nodes, but incoming
 network seems a bit low at 2.5MB/s, suggesting to me that this is 
 CPU-bound.

 Does that seem slow? Can anyone offer any ideas by glancing at the
 query as to why this might be slow? We'll profile it meanwhile and post
 back if we find anything ourselves.

 A side issue - I've found that this query, and others, sometimes
 completes but doesn't return any results. There appears to be no error that
 I can see in the logs, and Spark reports the job as successful, but the
 connected JDBC client (SQLWorkbenchJ in this case), just sits there forever
 waiting. I did a quick Google and couldn't find anyone else having similar
 issues.


 Many thanks,

 James.

>>>
>>
>


-- 

Architect - Big Data
Ph: +91 99805 99458

Manthan Systems | *Company of the year - Analytics (2014 Frost and Sullivan
India ICT)*
+++


Re: SQL vs. DataFrame API

2015-06-23 Thread Bob Corsaro
I've only tried it in python

On Tue, Jun 23, 2015 at 12:16 PM Ignacio Blasco 
wrote:

> That issue happens only in python dsl?
> El 23/6/2015 5:05 p. m., "Bob Corsaro"  escribió:
>
>> Thanks! The solution:
>>
>> https://gist.github.com/dokipen/018a1deeab668efdf455
>>
>> On Mon, Jun 22, 2015 at 4:33 PM Davies Liu  wrote:
>>
>>> Right now, we can not figure out which column you referenced in
>>> `select`, if there are multiple row with the same name in the joined
>>> DataFrame (for example, two `value`).
>>>
>>> A workaround could be:
>>>
>>> numbers2 = numbers.select(df.name, df.value.alias('other'))
>>> rows = numbers.join(numbers2,
>>> (numbers.name==numbers2.name) & (numbers.value !=
>>> numbers2.other),
>>> how="inner") \
>>>   .select(numbers.name, numbers.value, numbers2.other) \
>>>   .collect()
>>>
>>> On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco 
>>> wrote:
>>> > Sorry thought it was scala/spark
>>> >
>>> > El 22/6/2015 9:49 p. m., "Bob Corsaro"  escribió:
>>> >>
>>> >> That's invalid syntax. I'm pretty sure pyspark is using a DSL to
>>> create a
>>> >> query here and not actually doing an equality operation.
>>> >>
>>> >> On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco 
>>> >> wrote:
>>> >>>
>>> >>> Probably you should use === instead of == and !== instead of !=
>>> >>>
>>> >>> Can anyone explain why the dataframe API doesn't work as I expect it
>>> to
>>> >>> here? It seems like the column identifiers are getting confused.
>>> >>>
>>> >>> https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
>>>
>>


Re: SQL vs. DataFrame API

2015-06-23 Thread Ignacio Blasco
That issue happens only in python dsl?
El 23/6/2015 5:05 p. m., "Bob Corsaro"  escribió:

> Thanks! The solution:
>
> https://gist.github.com/dokipen/018a1deeab668efdf455
>
> On Mon, Jun 22, 2015 at 4:33 PM Davies Liu  wrote:
>
>> Right now, we can not figure out which column you referenced in
>> `select`, if there are multiple row with the same name in the joined
>> DataFrame (for example, two `value`).
>>
>> A workaround could be:
>>
>> numbers2 = numbers.select(df.name, df.value.alias('other'))
>> rows = numbers.join(numbers2,
>> (numbers.name==numbers2.name) & (numbers.value !=
>> numbers2.other),
>> how="inner") \
>>   .select(numbers.name, numbers.value, numbers2.other) \
>>   .collect()
>>
>> On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco 
>> wrote:
>> > Sorry thought it was scala/spark
>> >
>> > El 22/6/2015 9:49 p. m., "Bob Corsaro"  escribió:
>> >>
>> >> That's invalid syntax. I'm pretty sure pyspark is using a DSL to
>> create a
>> >> query here and not actually doing an equality operation.
>> >>
>> >> On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco 
>> >> wrote:
>> >>>
>> >>> Probably you should use === instead of == and !== instead of !=
>> >>>
>> >>> Can anyone explain why the dataframe API doesn't work as I expect it
>> to
>> >>> here? It seems like the column identifiers are getting confused.
>> >>>
>> >>> https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
>>
>


Should I keep memory dedicated for HDFS and Spark on cluster nodes?

2015-06-23 Thread maxdml
I'm wondering if there is a real benefit for splitting my memory in two for
the datanode/workers.

Datanodes and OS needs memory to perform their business. I suppose there
could be loss of performance if they came to compete for memory with the
worker(s).

Any opinion? :-)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Should-I-keep-memory-dedicated-for-HDFS-and-Spark-on-cluster-nodes-tp23451.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Nipun Arora
I don't think I have explicitly check-pointed anywhere. Unless it's
internal in some interface, I don't believe the application is checkpointed.

Thanks for the suggestion though..

Nipun

On Tue, Jun 23, 2015 at 11:05 AM, Benjamin Fradet  wrote:

> Are you using checkpointing?
>
> I had a similar issue when recreating a streaming context from checkpoint
> as broadcast variables are not checkpointed.
> On 23 Jun 2015 5:01 pm, "Nipun Arora"  wrote:
>
>> Hi,
>>
>> I have a spark streaming application where I need to access a model saved
>> in a HashMap.
>> I have *no problems in running the same code with broadcast variables in
>> the local installation.* However I get a *null pointer* *exception* when
>> I deploy it on my spark test cluster.
>>
>>
>> I have stored a model in a HashMap which is
>> serializable. I use a broadcast variables declared as a global static
>> variable to broadcast this hashmap:
>>
>> public static Broadcast> br;
>>
>> HashMap hm = checkerObj.getModel(esserver, type);
>>
>> br = ssc.sparkContext().broadcast(hm);
>>
>>
>> I need to access this model in my mapper phase, and do some operation
>> based on the checkup. The following is a snippet of how I access the
>> broadcast variable.
>>
>>
>> JavaDStream> split = matched.map(new 
>> GenerateType2Scores());
>>
>>
>> class GenerateType2Scores implements Function> String>> {
>> @Override
>> public Tuple3 call(String s) throws Exception{
>>
>> Long time = Type2ViolationChecker.getMTS(s);
>> HashMap temphm= Type2ViolationChecker.br.value();
>>
>> Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
>> return new Tuple3(time,score, s);}
>> }
>>
>> The temphm should refer to the hashmap stored in the broadcast variable.
>> Can anyone help me understand what is the correct way to access broadcast
>> variables in JAVA?
>>
>> Thanks
>> Nipun
>>
>


Re: SQL vs. DataFrame API

2015-06-23 Thread Bob Corsaro
Thanks! The solution:

https://gist.github.com/dokipen/018a1deeab668efdf455

On Mon, Jun 22, 2015 at 4:33 PM Davies Liu  wrote:

> Right now, we can not figure out which column you referenced in
> `select`, if there are multiple row with the same name in the joined
> DataFrame (for example, two `value`).
>
> A workaround could be:
>
> numbers2 = numbers.select(df.name, df.value.alias('other'))
> rows = numbers.join(numbers2,
> (numbers.name==numbers2.name) & (numbers.value !=
> numbers2.other),
> how="inner") \
>   .select(numbers.name, numbers.value, numbers2.other) \
>   .collect()
>
> On Mon, Jun 22, 2015 at 12:53 PM, Ignacio Blasco 
> wrote:
> > Sorry thought it was scala/spark
> >
> > El 22/6/2015 9:49 p. m., "Bob Corsaro"  escribió:
> >>
> >> That's invalid syntax. I'm pretty sure pyspark is using a DSL to create
> a
> >> query here and not actually doing an equality operation.
> >>
> >> On Mon, Jun 22, 2015 at 3:43 PM Ignacio Blasco 
> >> wrote:
> >>>
> >>> Probably you should use === instead of == and !== instead of !=
> >>>
> >>> Can anyone explain why the dataframe API doesn't work as I expect it to
> >>> here? It seems like the column identifiers are getting confused.
> >>>
> >>> https://gist.github.com/dokipen/4b324a7365ae87b7b0e5
>


Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Benjamin Fradet
Are you using checkpointing?

I had a similar issue when recreating a streaming context from checkpoint
as broadcast variables are not checkpointed.
On 23 Jun 2015 5:01 pm, "Nipun Arora"  wrote:

> Hi,
>
> I have a spark streaming application where I need to access a model saved
> in a HashMap.
> I have *no problems in running the same code with broadcast variables in
> the local installation.* However I get a *null pointer* *exception* when
> I deploy it on my spark test cluster.
>
>
> I have stored a model in a HashMap which is
> serializable. I use a broadcast variables declared as a global static
> variable to broadcast this hashmap:
>
> public static Broadcast> br;
>
> HashMap hm = checkerObj.getModel(esserver, type);
>
> br = ssc.sparkContext().broadcast(hm);
>
>
> I need to access this model in my mapper phase, and do some operation
> based on the checkup. The following is a snippet of how I access the
> broadcast variable.
>
>
> JavaDStream> split = matched.map(new 
> GenerateType2Scores());
>
>
> class GenerateType2Scores implements Function String>> {
> @Override
> public Tuple3 call(String s) throws Exception{
>
> Long time = Type2ViolationChecker.getMTS(s);
> HashMap temphm= Type2ViolationChecker.br.value();
>
> Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
> return new Tuple3(time,score, s);}
> }
>
> The temphm should refer to the hashmap stored in the broadcast variable.
> Can anyone help me understand what is the correct way to access broadcast
> variables in JAVA?
>
> Thanks
> Nipun
>


Re: [Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Nipun Arora
btw. just for reference I have added the code in a gist:

https://gist.github.com/nipunarora/ed987e45028250248edc

and a stackoverflow reference here:

http://stackoverflow.com/questions/31006490/broadcast-variable-null-pointer-exception-in-spark-streaming

On Tue, Jun 23, 2015 at 11:01 AM, Nipun Arora 
wrote:

> Hi,
>
> I have a spark streaming application where I need to access a model saved
> in a HashMap.
> I have *no problems in running the same code with broadcast variables in
> the local installation.* However I get a *null pointer* *exception* when
> I deploy it on my spark test cluster.
>
>
> I have stored a model in a HashMap which is
> serializable. I use a broadcast variables declared as a global static
> variable to broadcast this hashmap:
>
> public static Broadcast> br;
>
> HashMap hm = checkerObj.getModel(esserver, type);
>
> br = ssc.sparkContext().broadcast(hm);
>
>
> I need to access this model in my mapper phase, and do some operation
> based on the checkup. The following is a snippet of how I access the
> broadcast variable.
>
>
> JavaDStream> split = matched.map(new 
> GenerateType2Scores());
>
>
> class GenerateType2Scores implements Function String>> {
> @Override
> public Tuple3 call(String s) throws Exception{
>
> Long time = Type2ViolationChecker.getMTS(s);
> HashMap temphm= Type2ViolationChecker.br.value();
>
> Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
> return new Tuple3(time,score, s);}
> }
>
> The temphm should refer to the hashmap stored in the broadcast variable.
> Can anyone help me understand what is the correct way to access broadcast
> variables in JAVA?
>
> Thanks
> Nipun
>


[Spark Streaming] Null Pointer Exception when accessing broadcast variable to store a hashmap in Java

2015-06-23 Thread Nipun Arora
Hi,

I have a spark streaming application where I need to access a model saved
in a HashMap.
I have *no problems in running the same code with broadcast variables in
the local installation.* However I get a *null pointer* *exception* when I
deploy it on my spark test cluster.


I have stored a model in a HashMap which is
serializable. I use a broadcast variables declared as a global static
variable to broadcast this hashmap:

public static Broadcast> br;

HashMap hm = checkerObj.getModel(esserver, type);

br = ssc.sparkContext().broadcast(hm);


I need to access this model in my mapper phase, and do some operation based
on the checkup. The following is a snippet of how I access the broadcast
variable.


JavaDStream> split = matched.map(new
GenerateType2Scores());


class GenerateType2Scores implements Function> {
@Override
public Tuple3 call(String s) throws Exception{

Long time = Type2ViolationChecker.getMTS(s);
HashMap temphm= Type2ViolationChecker.br.value();

Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
return new Tuple3(time,score, s);}
}

The temphm should refer to the hashmap stored in the broadcast variable.
Can anyone help me understand what is the correct way to access broadcast
variables in JAVA?

Thanks
Nipun


java.lang.IllegalArgumentException: A metric named ... already exists

2015-06-23 Thread Juan Rodríguez Hortalá
Hi,

I'm running a program in Spark 1.4 where several Spark Streaming contexts
are created from the same Spark context. As pointed in
https://spark.apache.org/docs/latest/streaming-programming-guide.html each
Spark Streaming context is stopped before creating the next Spark Streaming
context. The program works ok, but I get exceptions like the following when
a new Spark Streaming context is created

15/06/23 16:34:51 INFO MetricsSystem: Metrics already registered
java.lang.IllegalArgumentException: A metric named
local-1435070090627.driver.SampleStreamingTest.StreamingMetrics.streaming.lastReceivedBatch_processingEndTime
already exists
at
com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:91)
at
com.codahale.metrics.MetricRegistry.registerAll(MetricRegistry.java:385)
at
com.codahale.metrics.MetricRegistry.register(MetricRegistry.java:85)


Is this something to be concerned, or just a minor nuisance?

Thanks a lot in advance.

Greetings,

Juan Rodriguez Hortala


Re: [Spark Streaming] Runtime Error in call to max function for JavaPairRDD

2015-06-23 Thread Nipun Arora
Thanks, will try this out and get back...

On Tue, Jun 23, 2015 at 2:30 AM, Tathagata Das  wrote:

> Try adding the provided scopes
>
>  
> org.apache.spark
> spark-core_2.10
> 1.4.0
>
> *provided  *
>  
> org.apache.spark
> spark-streaming_2.10
> 1.4.0
>
> *provided  *
>
> This prevents these artifacts from being included in the assembly JARs.
>
> See scope
>
> https://maven.apache.org/guides/introduction/introduction-to-dependency-mechanism.html#Dependency_Scope
>
> On Mon, Jun 22, 2015 at 10:28 AM, Nipun Arora 
> wrote:
>
>> Hi Tathagata,
>>
>> I am attaching a snapshot of my pom.xml. It would help immensely, if I
>> can include max, and min values in my mapper phase.
>>
>> The question is still open at :
>> http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796
>>
>> I see that there is a bug report filed for a similar error as well:
>> https://issues.apache.org/jira/browse/SPARK-3266
>>
>> Please let me know, how I can get the same version of spark streaming in
>> my assembly.
>> I am using the following spark version:
>> http://www.apache.org/dyn/closer.cgi/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.6.tgz
>> .. no compilation, just an untar and use the spark-submit script in a local
>> install.
>>
>>
>> I still get the same error.
>>
>> Exception in thread "JobGenerator" java.lang.NoSuchMethodError: 
>> org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
>>
>> 
>>  
>> org.apache.spark
>> spark-core_2.10
>> 1.4.0
>> 
>>  
>> org.apache.spark
>> spark-streaming_2.10
>> 1.4.0
>> 
>>
>> Thanks
>>
>> Nipun
>>
>>
>> On Thu, Jun 18, 2015 at 11:16 PM, Nipun Arora 
>> wrote:
>>
>>> Hi Tathagata,
>>>
>>> When you say please mark spark-core and spark-streaming as dependencies
>>> how do you mean?
>>> I have installed the pre-build spark-1.4 for Hadoop 2.6 from spark
>>> downloads. In my maven pom.xml, I am using version 1.4 as described.
>>>
>>> Please let me know how I can fix that?
>>>
>>> Thanks
>>> Nipun
>>>
>>> On Thu, Jun 18, 2015 at 4:22 PM, Tathagata Das 
>>> wrote:
>>>
 I think you may be including a different version of Spark Streaming in
 your assembly. Please mark spark-core nd spark-streaming as provided
 dependencies. Any installation of Spark will automatically provide Spark in
 the classpath so you do not have to bundle it.

 On Thu, Jun 18, 2015 at 8:44 AM, Nipun Arora 
 wrote:

> Hi,
>
> I have the following piece of code, where I am trying to transform a
> spark stream and add min and max to it of eachRDD. However, I get an error
> saying max call does not exist, at run-time (compiles properly). I am 
> using
> spark-1.4
>
> I have added the question to stackoverflow as well:
> http://stackoverflow.com/questions/30902090/adding-max-and-min-in-spark-stream-in-java/30909796#30909796
>
> Any help is greatly appreciated :)
>
> Thanks
> Nipun
>
> JavaPairDStream, Tuple3> 
> sortedtsStream = transformedMaxMintsStream.transformToPair(new Sort2());
>
> sortedtsStream.foreach(
> new Function, Tuple3 Long, Long>>, Void>() {
> @Override
> public Void call(JavaPairRDD, 
> Tuple3> tuple2Tuple3JavaPairRDD) throws Exception {
> List, 
> Tuple3> >templist = tuple2Tuple3JavaPairRDD.collect();
> for(Tuple2, 
> Tuple3> tuple :templist){
>
> Date date = new Date(tuple._1._1);
> int pattern = tuple._1._2;
> int count = tuple._2._1();
> Date maxDate = new Date(tuple._2._2());
> Date minDate = new Date(tuple._2._2());
> System.out.println("TimeSlot: " + date.toString() + " 
> Pattern: " + pattern + " Count: " + count + " Max: " + maxDate.toString() 
> + " Min: " + minDate.toString());
>
> }
> return null;
> }
> }
> );
>
> Error:
>
>
> 15/06/18 11:05:06 INFO BlockManagerInfo: Added input-0-1434639906000 in 
> memory on localhost:42829 (size: 464.0 KB, free: 264.9 MB)15/06/18 
> 11:05:06 INFO BlockGenerator: Pushed block input-0-1434639906000Exception 
> in thread "JobGenerator" java.lang.NoSuchMethodError: 
> org.apache.spark.api.java.JavaPairRDD.max(Ljava/util/Comparator;)Lscala/Tuple2;
> at 
> org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:346)
> at 
> org.necla.ngla.spark_streaming.MinMax.call(Type4ViolationChecker.java:340)
> at 
> org.apache.spark.streaming.api.java.JavaDStreamLike$class.scalaTransform$3(JavaDStreamLike.scala:360)
> 

Re: workaround for groupByKey

2015-06-23 Thread Silvio Fiorito
It all depends on what it is you need to do with the pages. If you’re just 
going to be collecting them then it’s really not much different than a 
groupByKey. If instead you’re looking to derive some other value from the 
series of pages then you could potentially partition by user id and run a 
mapPartitions or one of the other combineByKey APIs?


From: Jianguo Li
Date: Tuesday, June 23, 2015 at 9:46 AM
To: Silvio Fiorito
Cc: "user@spark.apache.org"
Subject: Re: workaround for groupByKey

Thanks. Yes, unfortunately, they all need to be grouped. I guess I can 
partition the record by user id. However, I have millions of users, do you 
think partition by user id will help?

Jianguo

On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
You’re right of course, I’m sorry. I was typing before thinking about what you 
actually asked!

On a second thought, what is the ultimate outcome for what you want the 
sequence of pages for? Do they need to actually all be grouped? Could you 
instead partition by user id then use a mapPartitions perhaps?

From: Jianguo Li
Date: Monday, June 22, 2015 at 6:21 PM
To: Silvio Fiorito
Cc: "user@spark.apache.org"
Subject: Re: workaround for groupByKey

Thanks for your suggestion. I guess aggregateByKey is similar to combineByKey. 
I read in the Learning Sparking

We can disable map-side aggregation in combineByKey() if we know that our data 
won’t benefit from it. For example, groupByKey() disables map-side aggregation 
as the aggregation function (appending to a list) does not save any space. If 
we want to disable map-side combines, we need to specify the partitioner; for 
now you can just use the partitioner on the source RDD by passingrdd.partitioner

It seems that when the map-side aggregation function is to append something to 
a list (as opposed to summing over all the numbers), then this map-side 
aggregation does not offer any benefit since appending to a list does not save 
any space. Is my understanding correct?

Thanks,

Jianguo

On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito 
mailto:silvio.fior...@granturing.com>> wrote:
You can use aggregateByKey as one option:

val input: RDD[Int, String] = ...

val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) => a += b, (a, 
b) => a ++ b)

From: Jianguo Li
Date: Monday, June 22, 2015 at 5:12 PM
To: "user@spark.apache.org"
Subject: workaround for groupByKey

Hi,

I am processing an RDD of key-value pairs. The key is an user_id, and the value 
is an website url the user has ever visited.

Since I need to know all the urls each user has visited, I am  tempted to call 
the groupByKey on this RDD. However, since there could be millions of users and 
urls, the shuffling caused by groupByKey proves to be a major bottleneck to get 
the job done. Is there any workaround? I want to end up with an RDD of 
key-value pairs, where the key is an user_id, the value is a list of all the 
urls visited by the user.

Thanks,

Jianguo




Spark launching without all of the requested YARN resources

2015-06-23 Thread Arun Luthra
Sometimes if my Hortonworks yarn-enabled cluster is fairly busy, Spark (via
spark-submit) will begin its processing even though it apparently did not
get all of the requested resources; it is running very slowly.

Is there a way to force Spark/YARN to only begin when it has the full set
of resources that I request?

Thanks,
Arun


Re: Shutdown with streaming driver running in cluster broke master web UI permanently

2015-06-23 Thread scar scar
Thank you Tathagata,

It is great to know about this issue, but our problem is a little bit
different. We have 3 nodes in our Spark cluster, and when the Zookeeper
leader dies, the Master Spark gets shut down, and remains down, but a new
master gets elected and loads the UI. I think if the problem was the
eventlogging, the new master would have failed as well. Or maybe i am wrong

On Tue, Jun 23, 2015 at 3:00 AM, Tathagata Das  wrote:

> Maybe this is a known issue with spark streaming and master web ui.
> Disable event logging, and it should be fine.
>
> https://issues.apache.org/jira/browse/SPARK-6270
>
> On Mon, Jun 22, 2015 at 8:54 AM, scar scar  wrote:
>
>> Sorry I was on vacation for a few days. Yes, it is on. This is what I
>> have in the logs:
>>
>> 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from
>> server sessionid 0x14dd82e22f70ef1, likely server has closed socket,
>> closing socket connection and attempting reconnect
>> 15/06/22 10:44:00 INFO ClientCnxn: Unable to read additional data from
>> server sessionid 0x24dc5a319b40090, likely server has closed socket,
>> closing socket connection and attempting reconnect
>> 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED
>> 15/06/22 10:44:01 INFO ConnectionStateManager: State change: SUSPENDED
>> 15/06/22 10:44:01 WARN ConnectionStateManager: There are no
>> ConnectionStateListeners registered.
>> 15/06/22 10:44:01 INFO ZooKeeperLeaderElectionAgent: We have lost
>> leadership
>> 15/06/22 10:44:01 ERROR Master: Leadership has been revoked -- master
>> shutting down.
>>
>>
>> On Thu, Jun 11, 2015 at 8:59 PM, Tathagata Das 
>> wrote:
>>
>>> Do you have the event logging enabled?
>>>
>>> TD
>>>
>>> On Thu, Jun 11, 2015 at 11:24 AM, scar0909  wrote:
>>>
 I have the same problem. i realized that the master spark becomes
 unresponsive when we kill the leader zookeeper (of course i assigned the
 leader election task to the zookeeper). please let me know if you have
 any
 devlepments.



 --
 View this message in context:
 http://apache-spark-user-list.1001560.n3.nabble.com/Shutdown-with-streaming-driver-running-in-cluster-broke-master-web-UI-permanently-tp4149p23284.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org


>>>
>>
>


Re: workaround for groupByKey

2015-06-23 Thread Jianguo Li
Thanks. Yes, unfortunately, they all need to be grouped. I guess I can
partition the record by user id. However, I have millions of users, do you
think partition by user id will help?

Jianguo

On Mon, Jun 22, 2015 at 6:28 PM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

>   You’re right of course, I’m sorry. I was typing before thinking about
> what you actually asked!
>
>  On a second thought, what is the ultimate outcome for what you want the
> sequence of pages for? Do they need to actually all be grouped? Could you
> instead partition by user id then use a mapPartitions perhaps?
>
>   From: Jianguo Li
> Date: Monday, June 22, 2015 at 6:21 PM
> To: Silvio Fiorito
> Cc: "user@spark.apache.org"
> Subject: Re: workaround for groupByKey
>
>   Thanks for your suggestion. I guess aggregateByKey is similar to
> combineByKey. I read in the Learning Sparking
>
>  *We can disable map-side aggregation in combineByKey() if we know that
> our data won’t benefit from it. For example, groupByKey() disables map-side
> aggregation as the aggregation function (appending to a list) does not save
> any space. If we want to disable map-side combines, we need to specify the
> partitioner; for now you can just use the partitioner on the source RDD by
> passingrdd.partitioner*
>
>  It seems that when the map-side aggregation function is to append
> something to a list (as opposed to summing over all the numbers), then this
> map-side aggregation does not offer any benefit since appending to a list
> does not save any space. Is my understanding correct?
>
>  Thanks,
>
>  Jianguo
>
> On Mon, Jun 22, 2015 at 4:43 PM, Silvio Fiorito <
> silvio.fior...@granturing.com> wrote:
>
>>  You can use aggregateByKey as one option:
>>
>>  val input: RDD[Int, String] = ...
>>
>>  val test = input.aggregateByKey(ListBuffer.empty[String])((a, b) => a
>> += b, (a, b) => a ++ b)
>>
>>   From: Jianguo Li
>> Date: Monday, June 22, 2015 at 5:12 PM
>> To: "user@spark.apache.org"
>> Subject: workaround for groupByKey
>>
>>   Hi,
>>
>>  I am processing an RDD of key-value pairs. The key is an user_id, and
>> the value is an website url the user has ever visited.
>>
>>  Since I need to know all the urls each user has visited, I am  tempted
>> to call the groupByKey on this RDD. However, since there could be millions
>> of users and urls, the shuffling caused by groupByKey proves to be a major
>> bottleneck to get the job done. Is there any workaround? I want to end up
>> with an RDD of key-value pairs, where the key is an user_id, the value is a
>> list of all the urls visited by the user.
>>
>>  Thanks,
>>
>>  Jianguo
>>
>
>


Re: Spark Streaming: limit number of nodes

2015-06-23 Thread Wojciech Pituła
I can not. I've already limited the number of cores to 10, so it gets 5
executors with 2 cores each...

wt., 23.06.2015 o 13:45 użytkownik Akhil Das 
napisał:

> Use *spark.cores.max* to limit the CPU per job, then you can easily
> accommodate your third job also.
>
> Thanks
> Best Regards
>
> On Tue, Jun 23, 2015 at 5:07 PM, Wojciech Pituła 
> wrote:
>
>> I have set up small standalone cluster: 5 nodes, every node has 5GB of
>> memory an 8 cores. As you can see, node doesn't have much RAM.
>>
>> I have 2 streaming apps, first one is configured to use 3GB of memory per
>> node and second one uses 2GB per node.
>>
>> My problem is, that smaller app could easily run on 2 or 3 nodes, instead
>> of 5 so I could lanuch third app.
>>
>> Is it possible to limit number of nodes(executors) that app wil get from
>> standalone cluster?
>>
>
>


Re: Spark Streaming: limit number of nodes

2015-06-23 Thread Akhil Das
Use *spark.cores.max* to limit the CPU per job, then you can easily
accommodate your third job also.

Thanks
Best Regards

On Tue, Jun 23, 2015 at 5:07 PM, Wojciech Pituła  wrote:

> I have set up small standalone cluster: 5 nodes, every node has 5GB of
> memory an 8 cores. As you can see, node doesn't have much RAM.
>
> I have 2 streaming apps, first one is configured to use 3GB of memory per
> node and second one uses 2GB per node.
>
> My problem is, that smaller app could easily run on 2 or 3 nodes, instead
> of 5 so I could lanuch third app.
>
> Is it possible to limit number of nodes(executors) that app wil get from
> standalone cluster?
>


Spark Streaming: limit number of nodes

2015-06-23 Thread Wojciech Pituła
I have set up small standalone cluster: 5 nodes, every node has 5GB of
memory an 8 cores. As you can see, node doesn't have much RAM.

I have 2 streaming apps, first one is configured to use 3GB of memory per
node and second one uses 2GB per node.

My problem is, that smaller app could easily run on 2 or 3 nodes, instead
of 5 so I could lanuch third app.

Is it possible to limit number of nodes(executors) that app wil get from
standalone cluster?


RE: Code review - Spark SQL command-line client for Cassandra

2015-06-23 Thread Matthew Johnson
Awesome, thanks Pawan – for now I’ll give spark-notebook a go until
Zeppelin catches up to Spark 1.4 (and when Zeppelin has a binary release –
my PC doesn’t seem too happy about building a Node.js app from source).
Thanks for the detailed instructions!!





*From:* pawan kumar [mailto:pkv...@gmail.com]
*Sent:* 22 June 2015 18:53
*To:* Matthew Johnson
*Cc:* Silvio Fiorito; Mohammed Guller; shahid ashraf; user
*Subject:* Re: Code review - Spark SQL command-line client for Cassandra



Hi Matthew,



you could add the dependencies yourself by using the %dep command in
zeppelin ( https://zeppelin.incubator.apache.org/docs/interpreter/spark.html).
I have not tried with zeppelin but have used spark-notebook
 and got Cassandra
connector working. Below have provided samples.



*In Zeppelin: (Not Tested)*



*%*dep z.load("com.datastax.com:spark-cassandra-connector_2.11:1.4.0-M1")



Note: In order for Spark and Cassandra to work the Spark ,
Spark-Cassandra-Connector, Spark-notebook spark version should match. In
the above case it was 1.2.0



*If using spark-notebook: (Tested & works)*

Installed :

1.   Apache Spark 1.2.0

2.   Cassandra DSE - 1 node (just Cassandra and no analytics)

3.   Notebook:

wget
https://s3.eu-central-1.amazonaws.com/spark-notebook/tgz/spark-notebook-0.4.3-scala-2.10.4-spark-1.2.0-hadoop-2.4.0.tgz



Once notebook have been started :

http://ec2-xx-x-xx-xxx.us-west-x.compute.amazonaws.com:9000/#clusters



Select Standalone:

In SparkConf : update the spark master ip to EC2 : internal DNS name.



*In Spark Notebook:*

:dp "com.datastax.spark" % "spark-cassandra-connector_2.10" % "1.2.0-rc3"



import com.datastax.spark.connector._

import com.datastax.spark.connector.rdd.CassandraRDD



val cassandraHost:String = "localhost"

reset(lastChanges = _.set("spark.cassandra.connection.host", cassandraHost))

val rdd = sparkContext.cassandraTable("excelsior","test")

rdd.toArray.foreach(println)



Note: In order for Spark and Cassandra to work the Spark ,
Spark-Cassandra-Connector, Spark-notebook spark version should match. In
the above case it was 1.2.0











On Mon, Jun 22, 2015 at 9:52 AM, Matthew Johnson 
wrote:

Hi Pawan,



Looking at the changes for that git pull request, it looks like it just
pulls in the dependency (and transitives) for “spark-cassandra-connector”.
Since I am having to build Zeppelin myself anyway, would it be ok to just
add this myself for the connector for 1.4.0 (as found here
http://search.maven.org/#artifactdetails%7Ccom.datastax.spark%7Cspark-cassandra-connector_2.11%7C1.4.0-M1%7Cjar)?
What exactly is it that does not currently exist for Spark 1.4?



Thanks,

Matthew



*From:* pawan kumar [mailto:pkv...@gmail.com]
*Sent:* 22 June 2015 17:19
*To:* Silvio Fiorito
*Cc:* Mohammed Guller; Matthew Johnson; shahid ashraf; user@spark.apache.org
*Subject:* Re: Code review - Spark SQL command-line client for Cassandra



Hi,



Zeppelin has a cassandra-spark-connector built into the build. I have not
tried it yet may be you could let us know.



https://github.com/apache/incubator-zeppelin/pull/79



To build a Zeppelin version with the *Datastax Spark/Cassandra connector
*

mvn clean package *-Pcassandra-spark-1.x* -Dhadoop.version=xxx -Phadoop-x.x
-DskipTests

Right now the Spark/Cassandra connector is available for *Spark 1.1* and *Spark
1.2*. Support for *Spark 1.3* is not released yet (*but you can build you
own Spark/Cassandra connector version **1.3.0-SNAPSHOT*). Support for *Spark
1.4* does not exist yet

Please do not forget to add -Dspark.cassandra.connection.host=xxx to the
*ZEPPELIN_JAVA_OPTS*parameter in *conf/zeppelin-env.sh* file. Alternatively
you can add this parameter in the parameter list of the *Spark interpreter* on
the GUI



-Pawan











On Mon, Jun 22, 2015 at 9:04 AM, Silvio Fiorito <
silvio.fior...@granturing.com> wrote:

Yes, just put the Cassandra connector on the Spark classpath and set the
connector config properties in the interpreter settings.



*From: *Mohammed Guller
*Date: *Monday, June 22, 2015 at 11:56 AM
*To: *Matthew Johnson, shahid ashraf


*Cc: *"user@spark.apache.org"
*Subject: *RE: Code review - Spark SQL command-line client for Cassandra



I haven’t tried using Zeppelin with Spark on Cassandra, so can’t say for
sure, but it should not be difficult.



Mohammed



*From:* Matthew Johnson [mailto:matt.john...@algomi.com
]
*Sent:* Monday, June 22, 2015 2:15 AM
*To:* Mohammed Guller; shahid ashraf
*Cc:* user@spark.apache.org
*Subject:* RE: Code review - Spark SQL command-line client for Cassandra



Thanks Mohammed, it’s good to know I’m not alone!



How easy is it to integrate Zeppelin with Spark on Cassandra? It looks like
it would only support Hadoop out of the box. Is it just a case of dropping
the Cassandra Connector onto the Spark classpath?



Cheers,

Matthew



*From:* Mohammed Guller [mailto:m

Re: Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?

2015-06-23 Thread Dmitry Goldenberg
Yes, Akhil. We already have an origination timestamp in the body of the message 
when we send it. But we can't guarantee the network speed nor a precise enough 
synchronization of clocks across machines.

Pulling the timestamp from Kafka itself would be a step forward although the 
broker is most likely going to be not collocated with the consumers, so this 
would still be imprecise.

> On Jun 23, 2015, at 3:46 AM, Akhil Das  wrote:
> 
> May be while producing the messages, you can make it as a keyedMessage with 
> the timestamp as key and on the consumer end you can easily identify the key 
> (which will be the timestamp) from the message. If the network is fast 
> enough, then i think there could be a small millisecond lag.
> 
> Thanks
> Best Regards
> 
>> On Tue, Jun 23, 2015 at 10:22 AM, dgoldenberg  
>> wrote:
>> Is there any way to retrieve the time of each message's arrival into a Kafka
>> topic, when streaming in Spark, whether with receiver-based or direct
>> streaming?
>> 
>> Thanks.
>> 
>> 
>> 
>> --
>> View this message in context: 
>> http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>> 
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
> 


Re: Help optimising Spark SQL query

2015-06-23 Thread James Aley
Thanks for the suggestions everyone, appreciate the advice.

I tried replacing DISTINCT for the nested GROUP BY, running on 1.4 instead
of 1.3, replacing the date casts with a "between" operation on the
corresponding long constants instead and changing COUNT(*) to COUNT(1).
None of these seem to have made any remarkable difference in running time
for the query.

I'll hook up YourKit and see if we can figure out where the CPU time is
going, then post back.

On 22 June 2015 at 16:01, Yin Huai  wrote:

> Hi James,
>
> Maybe it's the DISTINCT causing the issue.
>
> I rewrote the query as follows. Maybe this one can finish faster.
>
> select
>   sum(cnt) as uses,
>   count(id) as users
> from (
>   select
> count(*) cnt,
> cast(id as string) as id,
>   from usage_events
>   where
> from_unixtime(cast(timestamp_millis/1000 as bigint)) between
> '2015-06-09' and '2015-06-16'
>   group by cast(id as string)
> ) tmp
>
> Thanks,
>
> Yin
>
> On Mon, Jun 22, 2015 at 12:55 PM, Jörn Franke 
> wrote:
>
>> Generally (not only spark sql specific) you should not cast in the where
>> part of a sql query. It is also not necessary in your case. Getting rid of
>> casts in the whole query will be also beneficial.
>>
>> Le lun. 22 juin 2015 à 17:29, James Aley  a
>> écrit :
>>
>>> Hello,
>>>
>>> A colleague of mine ran the following Spark SQL query:
>>>
>>> select
>>>   count(*) as uses,
>>>   count (distinct cast(id as string)) as users
>>> from usage_events
>>> where
>>>   from_unixtime(cast(timestamp_millis/1000 as bigint))
>>> between '2015-06-09' and '2015-06-16'
>>>
>>> The table contains billions of rows, but totals only 64GB of data across
>>> ~30 separate files, which are stored as Parquet with LZO compression in S3.
>>>
>>> From the referenced columns:
>>>
>>> * id is Binary, which we cast to a String so that we can DISTINCT by
>>> it. (I was already told this will improve in a later release, in a separate
>>> thread.)
>>> * timestamp_millis is a long, containing a unix timestamp with
>>> millisecond resolution
>>>
>>> This took nearly 2 hours to run on a 5 node cluster of r3.xlarge EC2
>>> instances, using 20 executors, each with 4GB memory. I can see from
>>> monitoring tools that the CPU usage is at 100% on all nodes, but incoming
>>> network seems a bit low at 2.5MB/s, suggesting to me that this is CPU-bound.
>>>
>>> Does that seem slow? Can anyone offer any ideas by glancing at the query
>>> as to why this might be slow? We'll profile it meanwhile and post back if
>>> we find anything ourselves.
>>>
>>> A side issue - I've found that this query, and others, sometimes
>>> completes but doesn't return any results. There appears to be no error that
>>> I can see in the logs, and Spark reports the job as successful, but the
>>> connected JDBC client (SQLWorkbenchJ in this case), just sits there forever
>>> waiting. I did a quick Google and couldn't find anyone else having similar
>>> issues.
>>>
>>>
>>> Many thanks,
>>>
>>> James.
>>>
>>
>


How to disable parquet schema merging in 1.4?

2015-06-23 Thread Rex Xiong
I remember in a previous PR, schema merging can be disabled by
setting spark.sql.hive.convertMetastoreParquet.mergeSchema to false.
But in 1.4 release, I don't see this config anymore, is there a new way to
do it?

Thanks


Re: Velox Model Server

2015-06-23 Thread Sean Owen
Yes, and typically needs are <100ms. Now imagine even 10 concurrent
requests. My experience has been that this approach won't nearly
scale. The best you could probably do is async mini-batch
near-real-time scoring, pushing results to some store for retrieval,
which could be entirely suitable for your use case.

On Tue, Jun 23, 2015 at 8:52 AM, Nick Pentreath
 wrote:
> If your recommendation needs are real-time (<1s) I am not sure job server
> and computing the refs with spark will do the trick (though those new
> BLAS-based methods may have given sufficient speed up).

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Multiple executors writing file using java filewriter

2015-06-23 Thread Akhil Das
Then apply a transformation over the dstream to pull those required
information. :)

Thanks
Best Regards

On Tue, Jun 23, 2015 at 3:22 PM, anshu shukla 
wrote:

> Thanks alot ,
>
> Because i just want to log timestamp and  unique message id and not full
> RDD .
>
> On Tue, Jun 23, 2015 at 12:41 PM, Akhil Das 
> wrote:
>
>> Why don't you do a normal .saveAsTextFiles?
>>
>> Thanks
>> Best Regards
>>
>> On Mon, Jun 22, 2015 at 11:55 PM, anshu shukla 
>> wrote:
>>
>>> Thanx for reply !!
>>>
>>> YES , Either it should write on  any machine of cluster or Can  you
>>> please help me ...   that how to do  this . Previously i was using  writing
>>>  using  collect () , so some of my tuples are missing while writing.
>>>
>>> //previous logic that was just  creating  the  file on master -
>>>
>>>  newinputStream.foreachRDD(new Function2, Time, Void>()
>>> {
>>>
>>> @Override
>>> public Void call(JavaRDD v1, Time v2) throws Exception {
>>> for(String s:v1.collect()) {
>>> //System.out.println("v1 here is " + v1 + "---" + s);
>>> spoutlog.batchLogwriter(System.currentTimeMillis(), 
>>> "spout-MSGID," + msgeditor.getMessageId(s));
>>> //System.out.println(msgeditor.getMessageId(s));
>>> }
>>> return null;
>>> }
>>> });
>>>
>>>
>>>
>>>
>>> On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher <
>>> rmarsc...@localytics.com> wrote:
>>>
 Is spoutLog just a non-spark file writer? If you run that in the map
 call on a cluster its going to be writing in the filesystem of the executor
 its being run on. I'm not sure if that's what you intended.

 On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla 
 wrote:

> Running perfectly in local system but not writing to file in cluster mode 
> .ANY suggestions please ..
>
>
> //msgid is long counter
>
> JavaDStream  newinputStream=inputStream.map(new Function String>() {
> @Override
> public String call(String v1) throws Exception {
> String s1=msgId+"@"+v1;
> System.out.println(s1);
> msgId++;
> try {
> *//filewriter logic
> spoutlog.batchLogwriter(System.currentTimeMillis(), "spout-MSGID," + 
> msgeditor.getMessageId(s1));*
> } catch (Exception e) {
>
> System.out.println("exeception is here");
> e.printStackTrace();
> throw e;
> }
> System.out.println("msgid,"+msgId);
> return  msgeditor.addMessageId(v1,msgId);
> }
> });
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>
> On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla  > wrote:
>
>> Can not we  write some data to a txt file  in parallel with multiple
>> executors  running  in parallel ??
>>
>>
>> --
>> Thanks & Regards,
>> Anshu Shukla
>>
>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


>>>
>>>
>>> --
>>> Thanks & Regards,
>>> Anshu Shukla
>>>
>>
>>
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>


Re: Multiple executors writing file using java filewriter

2015-06-23 Thread anshu shukla
Thanks alot ,

Because i just want to log timestamp and  unique message id and not full
RDD .

On Tue, Jun 23, 2015 at 12:41 PM, Akhil Das 
wrote:

> Why don't you do a normal .saveAsTextFiles?
>
> Thanks
> Best Regards
>
> On Mon, Jun 22, 2015 at 11:55 PM, anshu shukla 
> wrote:
>
>> Thanx for reply !!
>>
>> YES , Either it should write on  any machine of cluster or Can  you
>> please help me ...   that how to do  this . Previously i was using  writing
>>  using  collect () , so some of my tuples are missing while writing.
>>
>> //previous logic that was just  creating  the  file on master -
>>
>>  newinputStream.foreachRDD(new Function2, Time, Void>() {
>>
>> @Override
>> public Void call(JavaRDD v1, Time v2) throws Exception {
>> for(String s:v1.collect()) {
>> //System.out.println("v1 here is " + v1 + "---" + s);
>> spoutlog.batchLogwriter(System.currentTimeMillis(), 
>> "spout-MSGID," + msgeditor.getMessageId(s));
>> //System.out.println(msgeditor.getMessageId(s));
>> }
>> return null;
>> }
>> });
>>
>>
>>
>>
>> On Mon, Jun 22, 2015 at 11:31 PM, Richard Marscher <
>> rmarsc...@localytics.com> wrote:
>>
>>> Is spoutLog just a non-spark file writer? If you run that in the map
>>> call on a cluster its going to be writing in the filesystem of the executor
>>> its being run on. I'm not sure if that's what you intended.
>>>
>>> On Mon, Jun 22, 2015 at 1:35 PM, anshu shukla 
>>> wrote:
>>>
 Running perfectly in local system but not writing to file in cluster mode 
 .ANY suggestions please ..


 //msgid is long counter

 JavaDStream  newinputStream=inputStream.map(new Function>>> String>() {
 @Override
 public String call(String v1) throws Exception {
 String s1=msgId+"@"+v1;
 System.out.println(s1);
 msgId++;
 try {
 *//filewriter logic
 spoutlog.batchLogwriter(System.currentTimeMillis(), "spout-MSGID," + 
 msgeditor.getMessageId(s1));*
 } catch (Exception e) {

 System.out.println("exeception is here");
 e.printStackTrace();
 throw e;
 }
 System.out.println("msgid,"+msgId);
 return  msgeditor.addMessageId(v1,msgId);
 }
 });


 --
 Thanks & Regards,
 Anshu Shukla

 On Mon, Jun 22, 2015 at 10:50 PM, anshu shukla 
 wrote:

> Can not we  write some data to a txt file  in parallel with multiple
> executors  running  in parallel ??
>
>
> --
> Thanks & Regards,
> Anshu Shukla
>



 --
 Thanks & Regards,
 Anshu Shukla

>>>
>>>
>>
>>
>> --
>> Thanks & Regards,
>> Anshu Shukla
>>
>
>


-- 
Thanks & Regards,
Anshu Shukla


RE: Web UI vs History Server Bugs

2015-06-23 Thread Evo Eftimov
Probably your application has crashed or was terminated without invoking the
stop method of spark context - in such cases it doesn't create the empty
flag file which apparently tells the history server that it can safely show
the log data - simpy go to some of the other dirs of the history server to
see what the name of the flag file was and then create it manually in the
dirs of the missing apps - then they will appear in the history server ui

 

From: Steve Loughran [mailto:ste...@hortonworks.com] 
Sent: Monday, June 22, 2015 7:22 PM
To: Jonathon Cai
Cc: user@spark.apache.org
Subject: Re: Web UI vs History Server Bugs

 

well, I'm afraid you've reached the limits of my knowledge ... hopefully
someone else can answer 

 

On 22 Jun 2015, at 16:37, Jonathon Cai  wrote:

 

No, what I'm seeing is that while the cluster is running, I can't see the
app info after the app is completed. That is to say, when I click on the
application name on master:8080, no info is shown. However, when I examine
the same file on the History Server, the application information opens fine.

 

On Sat, Jun 20, 2015 at 6:47 AM, Steve Loughran 
wrote:


> On 17 Jun 2015, at 19:10, jcai  wrote:
>
> Hi,
>
> I am running this on Spark stand-alone mode. I find that when I examine
the
> web UI, a couple bugs arise:
>
> 1. There is a discrepancy between the number denoting the duration of the
> application when I run the history server and the number given by the web
UI
> (default address is master:8080). I checked more specific details,
including
> task and stage durations (when clicking on the application), and these
> appear to be the same for both avenues.
>
> 2. Sometimes the web UI on master:8080 is unable to display more specific
> information for an application that has finished (when clicking on the
> application), even when there is a log file in the appropriate directory.
> But when the history server is opened, it is able to read this file and
> output information.
>

There's a JIRA open on the history server caching incomplete work...if you
click on the link to a job while it's in progress, you don't get any updates
later.

does this sound like what you are seeing?

 

 



Re: s3 - Can't make directory for path

2015-06-23 Thread Steve Loughran

> On 23 Jun 2015, at 00:09, Danny  wrote:
> 
> hi,
> 
> have you tested 
> 
> "s3://ww-sandbox/name_of_path/" instead of "s3://ww-sandbox/name_of_path"
> 

+ make sure the bucket is there already. Hadoop s3 clients don't currently 
handle that step

> or have you test to add your file extension with placeholder (*) like:
> 
> "s3://ww-sandbox/name_of_path/*.gz"
> 
> or
> 
> "s3://ww-sandbox/name_of_path/*.csv" 
> 
> depend on your files. If it does not work pls test with the new "s3a"
> protocol of Spark/Hadoop:
> 
> https://issues.apache.org/jira/browse/HADOOP-10400


...but don't expect performance or scalability in Hadoop 2.6; Hadoop 2.7 has 
the fixes needed for production use, as does CDH5.4

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Calculating tuple count /input rate with time

2015-06-23 Thread anshu shukla
I am calculating input rate using the following logic.

And i think this foreachRDD is always running on driver (println are
seen on driver)

1- Is there any other way to do that in less cost .

2- Will this give me the correct count for rate  .


//code -

inputStream.foreachRDD(new Function, Void>() {
@Override
public Void call(JavaRDD stringJavaRDD) throws Exception {
System.out.println(System.currentTimeMillis()+",spoutstringJavaRDD,"
+ stringJavaRDD.count() );
return null;
}
});



-- 
Thanks & Regards,
Anshu Shukla


RE: MLLIB - Storing the Trained Model

2015-06-23 Thread Yang, Yuhao
Hi Samsudhin,

  If possible, can you please provide a part of the code? Or perhaps try with 
the ut in RandomForestSuite to see if the issue repros.

Regards,
yuhao

-Original Message-
From: samsudhin [mailto:samsud...@pigstick.com] 
Sent: Tuesday, June 23, 2015 2:14 PM
To: user@spark.apache.org
Subject: MLLIB - Storing the Trained Model

HI All,

I was trying to store a trained model to the local hard disk. i am able to save 
it using save() function. while i am trying to retrieve the stored model using 
load() function i am end up with following error. kindly help me on this.

scala> val sameModel = 
scala> RandomForestModel.load(sc,"/home/ec2-user/myModel")
15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(255260) called with 
curMem=592097, maxMem=278302556
15/06/23 02:04:25 INFO MemoryStore: Block broadcast_6 stored as values in 
memory (estimated size 249.3 KB, free 264.6 MB)
15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(36168) called with 
curMem=847357, maxMem=278302556
15/06/23 02:04:25 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in 
memory (estimated size 35.3 KB, free 264.6 MB)
15/06/23 02:04:25 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 
localhost:42290 (size: 35.3 KB, free: 265.3 MB)
15/06/23 02:04:25 INFO BlockManagerMaster: Updated info of block
broadcast_6_piece0
15/06/23 02:04:25 INFO SparkContext: Created broadcast 6 from textFile at
modelSaveLoad.scala:125
15/06/23 02:04:25 INFO FileInputFormat: Total input paths to process : 1
15/06/23 02:04:25 INFO SparkContext: Starting job: first at
modelSaveLoad.scala:125
15/06/23 02:04:25 INFO DAGScheduler: Got job 3 (first at
modelSaveLoad.scala:125) with 1 output partitions (allowLocal=true)
15/06/23 02:04:25 INFO DAGScheduler: Final stage: Stage 3(first at
modelSaveLoad.scala:125)
15/06/23 02:04:25 INFO DAGScheduler: Parents of final stage: List()
15/06/23 02:04:25 INFO DAGScheduler: Missing parents: List()
15/06/23 02:04:25 INFO DAGScheduler: Submitting Stage 3 
(/home/ec2-user/myModel/metadata MapPartitionsRDD[7] at textFile at 
modelSaveLoad.scala:125), which has no missing parents
15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(2680) called with 
curMem=883525, maxMem=278302556
15/06/23 02:04:25 INFO MemoryStore: Block broadcast_7 stored as values in 
memory (estimated size 2.6 KB, free 264.6 MB)
15/06/23 02:04:25 INFO MemoryStore: ensureFreeSpace(1965) called with 
curMem=886205, maxMem=278302556
15/06/23 02:04:25 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in 
memory (estimated size 1965.0 B, free 264.6 MB)
15/06/23 02:04:25 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 
localhost:42290 (size: 1965.0 B, free: 265.3 MB)
15/06/23 02:04:25 INFO BlockManagerMaster: Updated info of block
broadcast_7_piece0
15/06/23 02:04:25 INFO SparkContext: Created broadcast 7 from broadcast at
DAGScheduler.scala:839
15/06/23 02:04:25 INFO DAGScheduler: Submitting 1 missing tasks from Stage 3 
(/home/ec2-user/myModel/metadata MapPartitionsRDD[7] at textFile at
modelSaveLoad.scala:125)
15/06/23 02:04:25 INFO TaskSchedulerImpl: Adding task set 3.0 with 1 tasks
15/06/23 02:04:25 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 3, 
localhost, PROCESS_LOCAL, 1311 bytes)
15/06/23 02:04:25 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
15/06/23 02:04:25 INFO HadoopRDD: Input split:
file:/home/ec2-user/myModel/metadata/part-0:0+97
15/06/23 02:04:25 INFO Executor: Finished task 0.0 in stage 3.0 (TID 3).
1989 bytes result sent to driver
15/06/23 02:04:25 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID
3) in 10 ms on localhost (1/1)
15/06/23 02:04:25 INFO DAGScheduler: Stage 3 (first at
modelSaveLoad.scala:125) finished in 0.010 s
15/06/23 02:04:25 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have 
all completed, from pool
15/06/23 02:04:25 INFO DAGScheduler: Job 3 finished: first at 
modelSaveLoad.scala:125, took 0.016193 s
15/06/23 02:04:25 WARN FSInputChecker: Problem opening checksum file:
file:/home/ec2-user/myModel/data/_temporary/0/_temporary/attempt_201506230149_0027_r_01_0/part-r-2.parquet.
 
Ignoring exception:
java.io.EOFException
at java.io.DataInputStream.readFully(DataInputStream.java:197)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at
org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.(ChecksumFileSystem.java:149)
at
org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:341)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
at
parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:402)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:298)
at
org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:297)
at
scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658)
at
sc

How to figure out how many records received by individual receiver

2015-06-23 Thread bit1...@163.com
Hi,
I am using spark1.3.1, and have 2 receivers,
On the web UI, I can only see the total records received by all these 2 
receivers, but I can't figure out the records received by individual receiver?
Not sure whether the information is shown on  the UI in spark1.4.





bit1...@163.com


Re: jars are not loading from 1.3. those set via setJars to the SparkContext

2015-06-23 Thread Tathagata Das
This could be because of some subtle change in the classloaders used by
executors. I think there has been issues in the past with libraries that
use Class.forName to find classes by reflection. Because the executors load
classes dynamically using custom class loaders, libraries that use
Class.forName does not use the right class loader that has the custom class
loader with dynamically loaded classes.

One workaround is the add the relevant library in the spark conf
spark.executor.extraClasspath and see if it works. Make sure that the
library should be already present in the worker machines at the given path.
This should start the executor with the library already present in the
initial classpath and therefore present in the system classloader. Then
probably Class.forName would find it.

TD

On Tue, Jun 23, 2015 at 12:14 AM, Murthy Chelankuri 
wrote:

> yes , in spark standalone mode witht the master URL.
>
> Jar are copying to execeutor and the application is running fine but its
> failing at some point when kafka is trying to load the classes using some
> reflection mechanisims for loading the Encoder and Partitioner classes.
>
> Here are my finding so far on this issue.
>
> But in the driver app if any module is trying to load class using the
> class loader  (using some reflection ) its not able to find the class. This
> use to work in the 1.2.0 not sure why its not working with 1.3.0
>
> Is there any way can we  make the driver to use the spark executor
> classloader for loading the classes or some thing like that?
>
>
> On Tue, Jun 23, 2015 at 12:28 PM, Tathagata Das 
> wrote:
>
>> So you have Kafka in your classpath in you Java application, where you
>> are creating the sparkContext with the spark standalone master URL, right?
>>
>> The recommended way of submitting spark applications to any cluster is
>> using spark-submit. See
>> https://spark.apache.org/docs/latest/submitting-applications.html. This
>> takes care of sending all the libraries to the cluster workers so that they
>> can be found.
>>
>> Please try that.
>>
>> On Mon, Jun 22, 2015 at 11:50 PM, Murthy Chelankuri 
>> wrote:
>>
>>> I am invoking it from the java application by creating the sparkcontext
>>>
>>> On Tue, Jun 23, 2015 at 12:17 PM, Tathagata Das 
>>> wrote:
>>>
 How are you adding that to the classpath? Through spark-submit or
 otherwise?

 On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri 
 wrote:

> Yes I have the producer in the class path. And I am using in
> standalone mode.
>
> Sent from my iPhone
>
> On 23-Jun-2015, at 3:31 am, Tathagata Das  wrote:
>
> Do you have Kafka producer in your classpath? If so how are adding
> that library? Are you running on YARN, or Mesos or Standalone or local.
> These details will be very useful.
>
> On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri  > wrote:
>
>> I am using spark streaming. what i am trying to do is sending few
>> messages to some kafka topic. where its failing.
>>
>> java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:264)
>> at kafka.utils.Utils$.createObject(Utils.scala:438)
>> at kafka.producer.Producer.(Producer.scala:61)
>>
>> On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri <
>> kmurt...@gmail.com> wrote:
>>
>>> I have been using the spark from the last 6 months with the version
>>> 1.2.0.
>>>
>>> I am trying to migrate to the 1.3.0 but the same problem i have
>>> written is not wokring.
>>>
>>> Its giving class not found error when i try to load some dependent
>>> jars from the main program.
>>>
>>> This use to work in 1.2.0 when set all the dependent jars array to
>>> the spark context but not working in 1.3.0
>>>
>>>
>>> Please help me how to resolve this.
>>>
>>>
>>> Thanks,
>>> Murthy Chelankuri
>>>
>>
>>
>

>>>
>>
>


Re: Re: What does [Stage 0:> (0 + 2) / 2] mean on the console

2015-06-23 Thread bit1...@163.com
Hi, Akhil, 
Thank you for the explanation!



bit1...@163.com
 
From: Akhil Das
Date: 2015-06-23 16:29
To: bit1...@163.com
CC: user
Subject: Re: What does [Stage 0:> (0 + 2) / 2] mean on the console
Well, you could that (Stage information) is an ASCII representation of the 
WebUI (running on port 4040). Since you set local[4] you will have 4 threads 
for your computation, and since you are having 2 receivers, you are left with 2 
threads to process ((0 + 2) <-- This 2 is your 2 threads.) And the other /2 
means you are having 2 tasks in that stage (with id 0).

Thanks
Best Regards

On Tue, Jun 23, 2015 at 1:21 PM, bit1...@163.com  wrote:
Hi,

I have a spark streaming application that runs locally with two receivers, some 
code snippet is as follows:

conf.setMaster("local[4]")

//RPC Log Streaming
val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) 
RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) 

//HTTP Log Streaming 
val httpStream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) 
HttpLogStreamProcessor.process(httpStream, taskConfBroadcast)


There is a log information showing on the console in red color
[Stage 0:> (0 + 2) / 2]
It appears, then disappear, and then appear, disappear...

For the above code, if I only have rpc streaming and comment the httpStream, 
then it disappear. I don't know how it occurs and how to suppress it



bit1...@163.com



Re: What does [Stage 0:> (0 + 2) / 2] mean on the console

2015-06-23 Thread Akhil Das
Well, you could that (Stage information) is an ASCII representation of the
WebUI (running on port 4040). Since you set local[4] you will have 4
threads for your computation, and since you are having 2 receivers, you are
left with 2 threads to process ((0 + 2) <-- This 2 is your 2 threads.) And
the other /2 means you are having 2 tasks in that stage (with id 0).

Thanks
Best Regards

On Tue, Jun 23, 2015 at 1:21 PM, bit1...@163.com  wrote:

> Hi,
>
> I have a spark streaming application that runs locally with two receivers,
> some code snippet is as follows:
>
> conf.setMaster("local[4]")
>
> //RPC Log Streaming
> val rpcStream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY)
> RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast)
>
> //HTTP Log Streaming
> val httpStream = KafkaUtils.createStream[String, String, StringDecoder,
> StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY)
> HttpLogStreamProcessor.process(httpStream, taskConfBroadcast)
>
>
> There is a log information showing on the console in red color
> [Stage 0:> (0 + 2) / 2]
> It appears, then disappear, and then appear, disappear...
>
> For the above code, if I only have rpc streaming and comment the
> httpStream, then it disappear. I don't know how it occurs and how to
> suppress it
>
> --
> bit1...@163.com
>


What does [Stage 0:> (0 + 2) / 2] mean on the console

2015-06-23 Thread bit1...@163.com
Hi,

I have a spark streaming application that runs locally with two receivers, some 
code snippet is as follows:

conf.setMaster("local[4]")

//RPC Log Streaming
val rpcStream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, consumerParams, topicRPC, StorageLevel.MEMORY_ONLY) 
RPCLogStreamProcessor.process(rpcStream, taskConfBroadcast) 

//HTTP Log Streaming 
val httpStream = KafkaUtils.createStream[String, String, StringDecoder, 
StringDecoder](ssc, consumerParams, topicHTTP, StorageLevel.MEMORY_ONLY) 
HttpLogStreamProcessor.process(httpStream, taskConfBroadcast)


There is a log information showing on the console in red color
[Stage 0:> (0 + 2) / 2]
It appears, then disappear, and then appear, disappear...

For the above code, if I only have rpc streaming and comment the httpStream, 
then it disappear. I don't know how it occurs and how to suppress it



bit1...@163.com


Re: Any way to retrieve time of message arrival to Kafka topic, in Spark Streaming?

2015-06-23 Thread Akhil Das
May be while producing the messages, you can make it as a keyedMessage with
the timestamp as key and on the consumer end you can easily identify the
key (which will be the timestamp) from the message. If the network is fast
enough, then i think there could be a small millisecond lag.

Thanks
Best Regards

On Tue, Jun 23, 2015 at 10:22 AM, dgoldenberg 
wrote:

> Is there any way to retrieve the time of each message's arrival into a
> Kafka
> topic, when streaming in Spark, whether with receiver-based or direct
> streaming?
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Any-way-to-retrieve-time-of-message-arrival-to-Kafka-topic-in-Spark-Streaming-tp23442.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Programming with java on spark

2015-06-23 Thread Akhil Das
Did you happened to try this?


JavaPairRDD hadoopFile = sc.hadoopFile(
"/sigmoid", DataInputFormat.class, LongWritable.class,
Text.class)



Thanks
Best Regards

On Tue, Jun 23, 2015 at 6:58 AM, 付雅丹  wrote:

> Hello, everyone! I'm new in spark. I have already written programs in
> Hadoop2.5.2, where I defined my own InputFormat and OutputFormat. Now I
> want to move my codes to spark using java language. The first problem I
> encountered is how to transform big txt file in local storage to RDD, which
> is compatible to my program written in hadoop. I found that there are
> functions in SparkContext which maybe helpful. But I don't know how to use
> them.
> E.G.
>
> public > RDD 
> >
>  newAPIHadoopFile(String path,
>Class fClass,
>Class kClass,
>Class vClass,
>  org.apache.hadoop.conf.Configuration conf)
>
> Get an RDD for a given Hadoop file with an arbitrary new API InputFormat
> and extra configuration options to pass to the input format.
>
> '''Note:''' Because Hadoop's RecordReader class re-uses the same Writable
> object for each record, directly caching the returned RDD or directly
> passing it to an aggregation or shuffle operation will create many
> references to the same object. If you plan to directly cache, sort, or
> aggregate Hadoop writable objects, you should first copy them using a map
>  function.
> In java, the following is wrong.
>
> /option one
> Configuration confHadoop = new Configuration();
> JavaPairRDD distFile=sc.newAPIHadoopFile(
> "hdfs://cMaster:9000/wcinput/data.txt",
> DataInputFormat,LongWritable,Text,confHadoop);
>
> /option two
> Configuration confHadoop = new Configuration();
> DataInputFormat input=new DataInputFormat();
> LongWritable longType=new LongWritable();
> Text text=new Text();
> JavaPairRDD distFile=sc.newAPIHadoopFile(
> "hdfs://cMaster:9000/wcinput/data.txt",
> input,longType,text,confHadoop);
>
> Can anyone help me? Thank you so much.
>
>


Re: Spark job fails silently

2015-06-23 Thread Akhil Das
Looks like a hostname conflict to me.


15/06/22 17:04:45 WARN Utils: Your hostname, datasci01.dev.abc.com resolves
to a loopback address: 127.0.0.1; using 10.0.3.197 instead (on interface
eth0)
15/06/22 17:04:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
another address

Can you paste your /etc/hosts here?


Thanks
Best Regards

On Tue, Jun 23, 2015 at 2:40 AM, roy  wrote:

> Hi,
>
>Our spark job on yarn suddenly started failing silently without showing
> any error following is the trace.
>
>
> Using properties file: /usr/lib/spark/conf/spark-defaults.conf
> Adding default property:
> spark.serializer=org.apache.spark.serializer.KryoSerializer
> Adding default property:
>
> spark.executor.extraJavaOptions=-Dlog4j.configuration=file:///etc/spark/log4j.properties
> Adding default property: spark.eventLog.enabled=true
> Adding default property: spark.shuffle.service.enabled=true
> Adding default property:
> spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native
> Adding default property:
> spark.yarn.historyServer.address=http://ds-hnn002.dev.abc.com:18088
> Adding default property:
> spark.yarn.am.extraLibraryPath=/usr/lib/hadoop/lib/native
> Adding default property: spark.ui.showConsoleProgress=true
> Adding default property: spark.shuffle.service.port=7337
> Adding default property: spark.master=yarn-client
> Adding default property:
> spark.executor.extraLibraryPath=/usr/lib/hadoop/lib/native
> Adding default property:
> spark.eventLog.dir=hdfs://magnetic-hadoop-dev/user/spark/applicationHistory
> Adding default property:
>
> spark.yarn.jar=local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar
> Parsed arguments:
>   master  yarn
>   deployMode  null
>   executorMemory  3G
>   executorCores   null
>   totalExecutorCores  null
>   propertiesFile  /usr/lib/spark/conf/spark-defaults.conf
>   driverMemory4G
>   driverCores null
>   driverExtraClassPathnull
>   driverExtraLibraryPath  /usr/lib/hadoop/lib/native
>   driverExtraJavaOptions  null
>   supervise   false
>   queue   null
>   numExecutors30
>   files   null
>   pyFiles null
>   archivesnull
>   mainClass   null
>   primaryResource
> file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py
>   nameupdb2vw_testing.py
>   childArgs   [--date 2015-05-20]
>   jarsnull
>   packagesnull
>   repositoriesnull
>   verbose true
>
> Spark properties used, including those specified through
>  --conf and those from the properties file
> /usr/lib/spark/conf/spark-defaults.conf:
>   spark.executor.extraLibraryPath -> /usr/lib/hadoop/lib/native
>   spark.yarn.jar ->
>
> local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar
>   spark.driver.extraLibraryPath -> /usr/lib/hadoop/lib/native
>   spark.yarn.historyServer.address -> http://ds-hnn002.dev.abc.com:18088
>   spark.yarn.am.extraLibraryPath -> /usr/lib/hadoop/lib/native
>   spark.eventLog.enabled -> true
>   spark.ui.showConsoleProgress -> true
>   spark.serializer -> org.apache.spark.serializer.KryoSerializer
>   spark.executor.extraJavaOptions ->
> -Dlog4j.configuration=file:///etc/spark/log4j.properties
>   spark.shuffle.service.enabled -> true
>   spark.shuffle.service.port -> 7337
>   spark.eventLog.dir ->
> hdfs://magnetic-hadoop-dev/user/spark/applicationHistory
>   spark.master -> yarn-client
>
>
> Main class:
> org.apache.spark.deploy.PythonRunner
> Arguments:
> file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py
> null
> --date
> 2015-05-20
> System properties:
> spark.executor.extraLibraryPath -> /usr/lib/hadoop/lib/native
> spark.driver.memory -> 4G
> spark.executor.memory -> 3G
> spark.yarn.jar ->
>
> local:/usr/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.0-hadoop2.6.0-cdh5.4.0.jar
> spark.driver.extraLibraryPath -> /usr/lib/hadoop/lib/native
> spark.executor.instances -> 30
> spark.yarn.historyServer.address -> http://ds-hnn002.dev.abc.com:18088
> spark.yarn.am.extraLibraryPath -> /usr/lib/hadoop/lib/native
> spark.ui.showConsoleProgress -> true
> spark.eventLog.enabled -> true
> spark.yarn.dist.files ->
> file:/home/jonathanarfa/code/updb/spark/updb2vw_testing.py
> SPARK_SUBMIT -> true
> spark.serializer -> org.apache.spark.serializer.KryoSerializer
> spark.executor.extraJavaOptions ->
> -Dlog4j.configuration=file:///etc/spark/log4j.properties
> spark.shuffle.service.enabled -> true
> spark.app.name -> updb2vw_testing.py
> spark.shuffle.service.port -> 7337
> spark.eventLog.dir ->
> hdfs://magnetic-hadoop-dev/user/spark/applicationHistory
> spark.master -> yarn-client
> Classpath elements:
>
>
>
> spark.akka.frameSize=60
> spark.app.name=updb2vw_2015-05-20
> spark.driver.extraLibraryPath=/usr/lib/hadoop/lib/native
> spark.

Re: jars are not loading from 1.3. those set via setJars to the SparkContext

2015-06-23 Thread Murthy Chelankuri
yes , in spark standalone mode witht the master URL.

Jar are copying to execeutor and the application is running fine but its
failing at some point when kafka is trying to load the classes using some
reflection mechanisims for loading the Encoder and Partitioner classes.

Here are my finding so far on this issue.

But in the driver app if any module is trying to load class using the class
loader  (using some reflection ) its not able to find the class. This use
to work in the 1.2.0 not sure why its not working with 1.3.0

Is there any way can we  make the driver to use the spark executor
classloader for loading the classes or some thing like that?


On Tue, Jun 23, 2015 at 12:28 PM, Tathagata Das  wrote:

> So you have Kafka in your classpath in you Java application, where you are
> creating the sparkContext with the spark standalone master URL, right?
>
> The recommended way of submitting spark applications to any cluster is
> using spark-submit. See
> https://spark.apache.org/docs/latest/submitting-applications.html. This
> takes care of sending all the libraries to the cluster workers so that they
> can be found.
>
> Please try that.
>
> On Mon, Jun 22, 2015 at 11:50 PM, Murthy Chelankuri 
> wrote:
>
>> I am invoking it from the java application by creating the sparkcontext
>>
>> On Tue, Jun 23, 2015 at 12:17 PM, Tathagata Das 
>> wrote:
>>
>>> How are you adding that to the classpath? Through spark-submit or
>>> otherwise?
>>>
>>> On Mon, Jun 22, 2015 at 5:02 PM, Murthy Chelankuri 
>>> wrote:
>>>
 Yes I have the producer in the class path. And I am using in standalone
 mode.

 Sent from my iPhone

 On 23-Jun-2015, at 3:31 am, Tathagata Das  wrote:

 Do you have Kafka producer in your classpath? If so how are adding that
 library? Are you running on YARN, or Mesos or Standalone or local. These
 details will be very useful.

 On Mon, Jun 22, 2015 at 8:34 AM, Murthy Chelankuri 
 wrote:

> I am using spark streaming. what i am trying to do is sending few
> messages to some kafka topic. where its failing.
>
> java.lang.ClassNotFoundException: com.abc.mq.msg.ObjectEncoder
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Class.java:264)
> at kafka.utils.Utils$.createObject(Utils.scala:438)
> at kafka.producer.Producer.(Producer.scala:61)
>
> On Mon, Jun 22, 2015 at 8:24 PM, Murthy Chelankuri  > wrote:
>
>> I have been using the spark from the last 6 months with the version
>> 1.2.0.
>>
>> I am trying to migrate to the 1.3.0 but the same problem i have
>> written is not wokring.
>>
>> Its giving class not found error when i try to load some dependent
>> jars from the main program.
>>
>> This use to work in 1.2.0 when set all the dependent jars array to
>> the spark context but not working in 1.3.0
>>
>>
>> Please help me how to resolve this.
>>
>>
>> Thanks,
>> Murthy Chelankuri
>>
>
>

>>>
>>
>


  1   2   >