Re: Running 2 spark application in parallel

2015-10-22 Thread Simon Elliston Ball
If yarn has capacity to run both simultaneously it will. You should ensure you 
are not allocating too many executors for the first app and leave some space 
for the second)

You may want to run the application on different yarn queues to control 
resource allocation. If you run as a different user within the same queue you 
should also get an even split between the applications, however you may need to 
enable preemption to ensure the first doesn't just hog the queue. 

Simon 

> On 22 Oct 2015, at 19:20, Suman Somasundar  
> wrote:
> 
> Hi all,
>  
> Is there a way to run 2 spark applications in parallel under Yarn in the same 
> cluster?
>  
> Currently, if I submit 2 applications, one of them waits till the other one 
> is completed.
>  
> I want both of them to start and run at the same time.
>  
> Thanks,
> Suman.


Analyzing consecutive elements

2015-10-22 Thread Sampo Niskanen
Hi,

I have analytics data with timestamps on each element.  I'd like to analyze
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
time-related elements.)

How can this be achieved?


*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


RE: Spark_1.5.1_on_HortonWorks

2015-10-22 Thread Sun, Rui
Frans,

SparkR runs with R 3.1+. If possible, latest verison of R is recommended.

From: Saisai Shao [mailto:sai.sai.s...@gmail.com]
Sent: Thursday, October 22, 2015 11:17 AM
To: Frans Thamura
Cc: Ajay Chander; Doug Balog; user spark mailing list
Subject: Re: Spark_1.5.1_on_HortonWorks

SparkR is shipped with Hortonworks version of Spark 1.4.1, there's no 
difference compared to community version, you could refer to the docs of Apache 
Spark. It would be better to ask HDP related questions in ( 
http://hortonworks.com/community/forums/forum/spark/ ). Sorry for not so 
familiar with SparkR related things.

Thanks
Saisai

On Thu, Oct 22, 2015 at 11:02 AM, Frans Thamura 
> wrote:
talking about spark in hdp

Is there reference about Spark-R, and what version should we install in R?
--
Frans Thamura (曽志胜)
Java Champion
Shadow Master and Lead Investor
Meruvian.
Integrated Hypermedia Java Solution Provider.

Mobile: +628557888699
Blog: http://blogs.mervpolis.com/roller/flatburger (id)

FB: http://www.facebook.com/meruvian
TW: http://www.twitter.com/meruvian / @meruvian
Website: http://www.meruvian.org

"We grow because we share the same belief."

On Thu, Oct 22, 2015 at 8:56 AM, Saisai Shao 
> wrote:
> How you start history server, do you still use the history server of 1.3.1,
> or you started the history server in 1.5.1?
>
> The Spark tarball you used is the community version, so Application
> TimelineServer based history provider is not supported, you could comment
> this configuration "spark.history.provider", so it will use default
> FsHistoryProvider, or you could configure "spark.history.provider" to
> "org.apache.spark.deploy.history.FsHistoryProvider".
>
> If you still want to use this ATS based history server, you have to wait for
> the technical preview release of Hortonworks.
>
> Thanks
> Saisai
>
>
> On Thu, Oct 22, 2015 at 9:47 AM, Ajay Chander 
> > wrote:
>>
>> Hi Sasai,
>>
>> Thanks for your time. I have followed your inputs and downloaded
>> "spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I did a
>> pie test everything seems to be working fine, except that the spark-history
>> -server running on this node1 has gone down. It was complaining about
>> missing class:
>>
>> 15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for
>> [TERM, HUP, INT]
>> 15/10/21 16:41:28 WARN SparkConf: The configuration key
>> 'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3
>> and and may be removed in the future. Please use the new key
>> 'spark.yarn.am.waitTime' instead.
>> 15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
>> 15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
>> 15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication
>> disabled; ui acls disabled; users with view permissions: Set(root); users
>> with modify permissions: Set(root)
>> Exception in thread "main" java.lang.ClassNotFoundException:
>> org.apache.spark.deploy.yarn.history.YarnHistoryProvider
>> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
>> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>> at java.lang.Class.forName0(Native Method)
>> at java.lang.Class.forName(Class.java:348)
>> at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
>> at
>> org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
>> at
>> org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)
>>
>>
>> I went to the lib folder and noticed that
>> "spark-assembly-1.5.1-hadoop2.6.0.jar" is missing that class. I was able to
>> get the spark history server started with 1.3.1 but not 1.5.1. Any inputs on
>> this?
>>
>> Really appreciate your help. Thanks
>>
>> Regards,
>> Ajay
>>
>>
>>
>> On Wednesday, October 21, 2015, Saisai Shao 
>> >
>> wrote:
>>>
>>> Hi Ajay,
>>>
>>> You don't need to copy tarball to all the nodes, only one node you want
>>> to run spark application is enough (mostly the master node), Yarn will help
>>> to distribute the Spark dependencies. The link I mentioned before is the one
>>> you could follow, please read my previous mail.
>>>
>>> Thanks
>>> Saisai
>>>
>>>
>>>
>>> On Thu, Oct 22, 2015 at 1:56 AM, Ajay Chander 
>>> >
>>> wrote:

 Thanks for your kind inputs. Right now I am running spark-1.3.1 on
 YARN(4 node cluster) on a HortonWorks distribution. 

Re: Kafka Streaming and Filtering > 3000 partitons

2015-10-22 Thread varun sharma
You can try something like this to filter by topic:

val kafkaStringStream = KafkaUtils.createDirectStream[...]

//you might want to create Stream by fetching offsets from zk

kafkaStringStream.foreachRDD{rdd =>
  val topics = rdd.map(_._1).distinct().collect()
  if (topics.length > 0) {
val rdd_value = rdd.take(10).mkString("\n.\n")
Logger.log(this.getClass, INFO, BaseSLog(s"Printing all feeds\n$rdd_value"))

topics.foreach { topic =>
  val filteredRdd = rdd.collect { case (t, data) if t == topic => data }
  //do anything with this filteredRdd, like saving to data store
}
//update the offsets
ZookeeperManaager.updateOffsetsinZk(rdd)
  }
}

Regards

Varun


On Thu, Oct 22, 2015 at 2:44 AM, Cody Koeninger  wrote:

> Yeah, that's the general idea.
>
> Regarding the question in your code comments ... The code inside of
> foreachPartition is what's running on the executor.  It wouldn't make any
> sense to try to get a partition ID before that block.
>
> On Wed, Oct 21, 2015 at 4:07 PM, Dave Ariens 
> wrote:
>
>> Cody,
>>
>>
>>
>> First off--thanks for your contributions and blog post, I actually linked
>> to in my original question. You'll have to forgive me as I've only been
>> using Spark and writing Scala for a few days. I'm aware that the RDD
>> partitions are 1:1 with Kafka topic partitions and you can get the offset
>> ranges.  But my understand is that the below code would need to be executed
>> after the stream has been processed.
>>
>>
>>
>> Let's say we're storing our filters in a key value map where the key is
>> the topic name, and the value is a string that a message within a partition
>> of that topic must contain to match.
>>
>>
>>
>> Is this the approach you're suggesting (using your example code)?
>>
>>
>>
>> // This would get built up on the driver, likely fetching the topic and
>> filters from ZK
>>
>> val topicFilters = Map("topic1" -> "this text must match", "topic2" ->
>> "this other text must match")
>>
>>
>>
>>
>>
>> val stream = KafkaUtils.createDirectStream(...)
>>
>>   ...
>>
>>   stream.foreachRDD { rdd =>
>>
>> // Cast the rdd to an interface that lets us get an array of
>> OffsetRange
>>
>> val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
>>
>>
>>
>> rdd.foreachPartition { iter =>
>>
>>   // index to get the correct offset range for the rdd partition
>> we're working on
>>
>>   val osr: OffsetRange = offsetRanges(TaskContext.get.partitionId)
>>
>>
>>
>>   // get any needed data from the offset range
>>
>>   val topic = osr.topic
>>
>>   val kafkaPartitionId = osr.partition
>>
>>   val begin = osr.fromOffset
>>
>>   val end = osr.untilOffset
>>
>>
>>
>>   // Now we know the topic name, we can filter something
>>
>>   // Or could we have referenced the topic name from
>>
>>   // offsetRanges(TaskContext.get.partitionId) earlier
>>
>>   // before we entered into stream.foreachRDD...?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *From:* Cody Koeninger [mailto:c...@koeninger.org]
>> *Sent:* Wednesday, October 21, 2015 3:01 PM
>> *To:* Dave Ariens
>> *Cc:* user@spark.apache.org
>> *Subject:* Re: Kafka Streaming and Filtering > 3000 partitons
>>
>>
>>
>> The rdd partitions are 1:1 with kafka topicpartitions, so you can use
>> offsets ranges to figure out which topic a given rdd partition is for and
>> proceed accordingly.  See the kafka integration guide in the spark
>> streaming docs for more details, or
>> https://github.com/koeninger/kafka-exactly-once
>>
>>
>>
>> As far as setting offsets in ZK, there's a private interface in the spark
>> codebase that would make it a little easier for you to do that.  You can
>> see that code for reference, or there's an outstanding ticket for making it
>> public https://issues.apache.org/jira/browse/SPARK-10963
>>
>>
>>
>> On Wed, Oct 21, 2015 at 1:50 PM, Dave Ariens 
>> wrote:
>>
>> Hey folks,
>>
>>
>>
>> I have a very large number of Kafka topics (many thousands of partitions)
>> that I want to consume, filter based on topic-specific filters, then
>> produce back to filtered topics in Kafka.
>>
>>
>>
>> Using the receiver-less based approach with Spark 1.4.1 (described here
>> )
>> I am able to use either KafkaUtils.createDirectStream or
>> KafkaUtils.createRDD, consume from many topics, and filter them with the
>> same filters but I can't seem to wrap my head around how to apply
>> topic-specific filters, or to finally produce to topic-specific destination
>> topics.
>>
>>
>>
>> Another point would be that I will need to checkpoint the metadata after
>> each successful batch and set starting offsets per partition back to ZK.  I
>> expect I can do that on the final RDDs after casting them accordingly, but
>> if anyone has any expertise/guidance doing that and is willing to share,
>> I'd be pretty 

Request for submitting Spark jobs in code purely, without jar

2015-10-22 Thread ??????
Hi developers, I've encountered some problem with Spark, and before opening an 
issue, I'd like to hear your thoughts.


Currently, if you want to submit a Spark job, you'll need to write the code, 
make a jar, and then submit it with spark-submit or 
org.apache.spark.launcher.SparkLauncher. 


But sometimes, the RDD operation chain is transferred dynamically in code, from 
SQL or even GUI. thus it seems either inconvenient or not possible to make a 
separated jar. Then I tried something like below:
val conf = new SparkConf().setAppName("Demo").setMaster("yarn-client")val sc = 
new SparkContext(conf)sc.textFile("README.md").flatMap(_.split(" ")).map((_, 
1)).reduceByKey(_+_).foreach(println) // A simple word countWhen they are 
executed, a Spark job is submitted. However, there are some remaining problems:
1. It doesn't support all deploy modes, such as yarn-cluster.
2. With the "Only 1 SparkContext in 1 JVM" limit, I can not run this twice.
3. It runs within the same process with my code, no child process is created.



Thus, what I wish for is that the problems can be handle by Spark itself, and 
my request can be simply described as a "adding submit() method for 
SparkContext / StreamingContext / SQLContext". I hope if I added a line after 
the code above like this:
sc.submit()then Spark can handle all background submitting processing for me.

I already opened an issue before for this demand, but I couldn't make myself 
clear back then. So I wrote this email and try to talk to you guys. Please 
reply if you need further descriptions, and I'll open a issue for this if you 
understand my demand and believe that it's something worth doing.


Thanks a lot.


Yuhang Chen.

yuhang.c...@foxmail.com

Re: Request for submitting Spark jobs in code purely, without jar

2015-10-22 Thread Ali Tajeldin EDU
The Spark job-server project may help 
(https://github.com/spark-jobserver/spark-jobserver).
--
Ali

On Oct 21, 2015, at 11:43 PM, ??  wrote:

> Hi developers, I've encountered some problem with Spark, and before opening 
> an issue, I'd like to hear your thoughts.
> 
> Currently, if you want to submit a Spark job, you'll need to write the code, 
> make a jar, and then submit it with spark-submit or 
> org.apache.spark.launcher.SparkLauncher.
> 
> But sometimes, the RDD operation chain is transferred dynamically in code, 
> from SQL or even GUI. thus it seems either inconvenient or not possible to 
> make a separated jar. Then I tried something like below:
> val conf = new SparkConf().setAppName("Demo").setMaster("yarn-client")
> val sc = new SparkContext(conf)
> sc.textFile("README.md").flatMap(_.split(" ")).map((_, 
> 1)).reduceByKey(_+_).foreach(println) // A simple word count
> When they are executed, a Spark job is submitted. However, there are some 
> remaining problems:
> 1. It doesn't support all deploy modes, such as yarn-cluster.
> 2. With the "Only 1 SparkContext in 1 JVM" limit, I can not run this twice.
> 3. It runs within the same process with my code, no child process is created.
> 
> Thus, what I wish for is that the problems can be handle by Spark itself, and 
> my request can be simply described as a "adding submit() method for 
> SparkContext / StreamingContext / SQLContext". I hope if I added a line after 
> the code above like this:
> sc.submit()
> then Spark can handle all background submitting processing for me.
> 
> I already opened an issue before for this demand, but I couldn't make myself 
> clear back then. So I wrote this email and try to talk to you guys. Please 
> reply if you need further descriptions, and I'll open a issue for this if you 
> understand my demand and believe that it's something worth doing.
> 
> Thanks a lot.
> 
> Yuhang Chen.
> yuhang.c...@foxmail.com



Re: Maven Repository Hosting for Spark SQL 1.5.1

2015-10-22 Thread Deenar Toraskar
I can see this artifact in public repos
http://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.5.1

http://central.maven.org/maven2/org/apache/spark/spark-sql_2.10/1.5.1/spark-sql_2.10-1.5.1.jar

check your proxy settings or the list of repos you are using.

Deenar

On 22 October 2015 at 17:48, William Li  wrote:

>
> Hi – I tried to download the Spark SQL 2.10 and version 1.5.1 from
> Intellij using the maven library:
>
> -Project Structure
> -Global Library, click on the + to select Maven Repository
> -Type in org.apache.spark to see the list.
> -The list result only shows version up to spark-sql_2.10-1.1.1
> -I tried to manually typed in the version 1.5.1 but it doesn’t download
> the correct list of files needed.
>
> I can’t see the 1.5.1 version. So there seems to be some problem because
> Intellij reported that org.apache.spark:spark-sql_2.10-1.5.1 is not
> available or not complete.
>
> Does anyone know who to contact to verify that?
>
>
> Thanks,
>
> William.
>


Fwd: sqlContext load by offset

2015-10-22 Thread Kayode Odeyemi
Hi,

I've trying to load a postgres table using the following expression:

val cachedIndex = cache.get("latest_legacy_group_index")
val mappingsDF = sqlContext.load("jdbc", Map(
  "url" -> Config.dataSourceUrl(mode, Some("mappings")),
  "dbtable" -> s"(select userid, yid, username from legacyusers offset
$cachedIndex ) as legacyusers")
)

I'll like to know if this expression is correct:

"dbtable" -> s"(select userid, yid, username from legacyusers offset
$cachedIndex ) as legacyusers")

As you can see. I'm trying to load the table records by offset

I appreciate your help.


Re: Spark 1.5.1+Hadoop2.6 .. unable to write to S3 (HADOOP-12420)

2015-10-22 Thread Steve Loughran

> On 22 Oct 2015, at 15:12, Ashish Shrowty  wrote:
> 
> I understand that there is some incompatibility with the API between Hadoop
> 2.6/2.7 and Amazon AWS SDK where they changed a signature of 
> com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold.
> The JIRA indicates that this would be fixed in Hadoop 2.8.
> (https://issues.apache.org/jira/browse/HADOOP-12420)
> 
> My question is - what are people doing today to access S3? I am unable to
> find an older JAR of the AWS SDK to test with.

its on maven

http://mvnrepository.com/artifact/com.amazonaws/aws-java-sdk/1.7.4

> 
> Thanks,
> Ashish
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-Hadoop2-6-unable-to-write-to-S3-HADOOP-12420-tp25163.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark.deploy.zookeeper.url

2015-10-22 Thread Michal Čizmazia
Does the spark.deploy.zookeeper.url configuration work correctly when I
point it to a single virtual IP address with more hosts behind it (load
balancer or round robin)?

https://spark.apache.org/docs/latest/spark-standalone.html#high-availability

ZooKeeper FAQ also discusses this topic:

https://wiki.apache.org/hadoop/ZooKeeper/FAQ#A8

Thanks!


Re: Large number of conf broadcasts

2015-10-22 Thread Anders Arpteg
Yes, seems unnecessary. I actually tried patching the
com.databricks.spark.avro reader to only broadcast once per dataset,
instead of every single file/partition. It seems to work just as fine, and
there are significantly less broadcasts and not seeing out of memory issues
any more. Strange that more people does not react to this, since the
broadcasting seems completely unnecessary...

Best,
Anders

On Thu, Oct 22, 2015 at 7:03 PM Koert Kuipers  wrote:

> i am seeing the same thing. its gona completely crazy creating broadcasts
> for the last 15 mins or so. killing it...
>
> On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg  wrote:
>
>> Hi,
>>
>> Running spark 1.5.0 in yarn-client mode, and am curios in why there are
>> so many broadcast being done when loading datasets with large number of
>> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
>> files in the avro folder, and sometime loading hundreds of these large
>> datasets. Believe I have located the broadcast to line
>> SparkContext.scala:1006. It seems to just broadcast the hadoop
>> configuration, and I don't see why it should be necessary to broadcast that
>> for EVERY file? Wouldn't it be possible to reuse the same broadcast
>> configuration? It hardly the case the the configuration would be different
>> between each file in a single dataset. Seems to be wasting lots of memory
>> and needs to persist unnecessarily to disk (see below again).
>>
>> Thanks,
>> Anders
>>
>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0
>> to disk  [19/49086]15/09/24
>> 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
>> 10.254.35.24:49428 (size: 23.1 KB)
>> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as
>> bytes in memory (estimated size 23.1 KB, free 2.4 KB)
>> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
>> memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
>> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
>> hadoopFile at AvroRelation.scala:121
>> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
>> threshold of 1024.0 KB for computing block broadcast_4804 in memory
>> .
>> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
>> broadcast_4804 in memory! (computed 496.0 B so far)
>> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0
>> B (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
>> limit = 530.3 MB.
>> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
>> disk instead.
>> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
>> curMem=556036460, maxMem=556038881
>> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
>> 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0
>> from memory
>> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0
>> to disk
>>
>>
>
>


Re: Spark_1.5.1_on_HortonWorks

2015-10-22 Thread Steve Loughran

On 22 Oct 2015, at 02:47, Ajay Chander 
> wrote:

Thanks for your time. I have followed your inputs and downloaded 
"spark-1.5.1-bin-hadoop2.6" on one of the node say node1. And when I did a pie 
test everything seems to be working fine, except that the spark-history -server 
running on this node1 has gone down. It was complaining about  missing class:

15/10/21 16:41:28 INFO HistoryServer: Registered signal handlers for [TERM, 
HUP, INT]
15/10/21 16:41:28 WARN SparkConf: The configuration key 
'spark.yarn.applicationMaster.waitTries' has been deprecated as of Spark 1.3 
and and may be removed in the future. Please use the new key 
'spark.yarn.am.waitTime' instead.
15/10/21 16:41:29 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
15/10/21 16:41:29 INFO SecurityManager: Changing view acls to: root
15/10/21 16:41:29 INFO SecurityManager: Changing modify acls to: root
15/10/21 16:41:29 INFO SecurityManager: SecurityManager: authentication 
disabled; ui acls disabled; users with view permissions: Set(root); users with 
modify permissions: Set(root)
Exception in thread "main" java.lang.ClassNotFoundException: 
org.apache.spark.deploy.yarn.history.YarnHistoryProvider
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:173)
at 
org.apache.spark.deploy.history.HistoryServer$.main(HistoryServer.scala:231)
at 
org.apache.spark.deploy.history.HistoryServer.main(HistoryServer.scala)


OK, that's happening because the version of spark we ship comes with 
integration with the yarn history provider, and that's not on your classpath.


find the history server config and either unset spark.history.provider or set 
it back to the filesystem one:

spark.history.provider org.apache.spark.deploy.history.FsHistoryProvider

The 1.5.1-based binaries we'll push out will be set up for this and for the 
spark apps to be able to publish to the timeline server.

note that irrespective of how histories are saved, a 1.3 or 1.4 history server 
cannot view a 1.5 one. So you will need to run a 1.5.x history server to view 
the most recent histories. But you can run a 1.4 history server talking to the 
timeline service alongside a 1.5 history server working through HDFS.

see: SPARK-1537 for the details of what's happening



"java.io.IOException: Connection reset by peer" thrown on the resource manager when launching Spark on Yarn

2015-10-22 Thread PashMic
Hi all,

I am trying to launch a Spark job using yarn-client mode on a cluster. I
have already tried spark-shell with yarn and I can launch the application.
But, I also would like to be able run the driver program from, say eclipse,
while using the cluster to run the tasks. I have also added spark-assembly
jar to HDFS and point to it by adding (HADOOP_CONF_DIR env variable) to
eclipse, although I'm not sure if that's the best way to go about this.

My application does launch on the cluster (as I can see it in the resource
manager's monitor) it finishes "successfully" but without any results coming
back to the driver. I see the following exception in eclipse console:

WARN  10:11:08,375  Logging.scala:71 -- Lost task 0.0 in stage 1.0 (TID 1,
vanpghdcn2.pgdev.sap.corp): java.lang.NullPointerException
at
org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1.apply(ExistingRDD.scala:56)
at
org.apache.spark.sql.execution.RDDConversions$$anonfun$rowToRowRdd$1.apply(ExistingRDD.scala:55)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$17.apply(RDD.scala:686)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)

ERROR 10:11:08,522  Logging.scala:75 -- Task 0 in stage 1.0 failed 4 times;
aborting job
INFO  10:11:08,538  SparkUtils.scala:67 --   SparkContext stopped


And I get the following in the ResourceManager log:

2015-10-22 10:08:57,126 WARN org.apache.hadoop.yarn.server.webapp.AppBlock:
Container with id 'container_1445462013958_0011_01_01' doesn't exist in
RM.
2015-10-22 10:10:37,400 INFO
org.apache.hadoop.yarn.server.resourcemanager.ClientRMService: Allocated new
applicationId: 12
2015-10-22 10:10:42,429 WARN
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: The specific
max attempts: 0 for application: 12 is invalid, because it is out of the
range [1, 2]. Use the global max attempts instead.
2015-10-22 10:10:42,429 INFO
org.apache.hadoop.yarn.server.resourcemanager.ClientRMService: Application
with id 12 submitted by user hdfs
2015-10-22 10:10:42,429 INFO
org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger: USER=hdfs
IP=10.161.43.118OPERATION=Submit Application Request
TARGET=ClientRMService
RESULT=SUCCESS  APPID=application_1445462013958_0012
2015-10-22 10:10:42,429 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl: Storing
application with id application_1445462013958_0012
2015-10-22 10:10:42,430 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
application_1445462013958_0012 State change from NEW to NEW_SAVING
2015-10-22 10:10:42,430 INFO
org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore: Storing
info for app: application_1445462013958_0012
2015-10-22 10:10:42,430 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
application_1445462013958_0012 State change from NEW_SAVING to SUBMITTED
2015-10-22 10:10:42,431 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue:
Application added - appId: application_1445462013958_0012 user: hdfs
leaf-queue of parent: root #applications: 1
2015-10-22 10:10:42,431 INFO
org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler:
Accepted application application_1445462013958_0012 from user: hdfs, in
queue: default
2015-10-22 10:10:42,432 INFO
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl:
application_1445462013958_0012 State change from SUBMITTED to ACCEPTED
2015-10-22 10:10:42,432 INFO

Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-22 Thread Sander van Dijk
I don't think it is possible in the way you try to do it. It is important
to remember that the statements you mention only set up the stream stages,
before the stream is actually running. Once it's running, you cannot
change, remove or add stages.

I am not sure how you determine your condition and what the actual change
should be when that condition is met: you say you want a different update
function but then give a statement with the same update function but a
different source stream). Is the condition determined somehow from the data
coming through streamLogs, and is newData basically streamLogs again
(rather than a whole data source?). In that case I can think of 3 things to
try:

- if the condition you switch on can be determined independently from every
item in streamLogs, you can simply do an if/else inside updateResultsStream
to change the method that you determine your state
- if this is not the case, but you can determine when to switch your
condition for each key independently, you can extend your state type to
also keep track of your condition: rather than using
JavaPairDStream you make updatedResultsState a
JavaPairDStream> (assuming you have some
class Pair), and you make updateResultsStream update and check the state of
the boolean.
- finally, you can have a separate state stream that keeps track of your
condition globally, then join that with you main stream and use that to
update state. Something like:

// determineCondition should result in a reduction to a single item that
signals whether the condition is met in the current batch,
updateContitionState should remember that
conditionStateStream =
streamLogs.reduce(determineCondition).updateStateByKey(updateConditionState)


// addCondition gets RDDs from streamLogs and  single-item RDDs with the
condition state and should add that state to each item in the streamLogs RDD
joinedStream = streamLogs.transformWith(conditionStateStream, addCondition)

// This is similar to the extend state type of the previous idea, but now
your condition state is determined globally rather than per log entry
updatedResultsState = joinedStream.updateStateByKey(updateResultsStream)

I hope this applies to your case and that it makes sense, my Java is a bit
rusty :) and perhaps others can suggest better spark streaming methods that
can be used, but hopefully the idea is clear.

Sander

On Thu, Oct 22, 2015 at 4:06 PM Uthayan Suthakar 
wrote:

> Hello guys,
>
> I have a stream job that will carryout computations and update the state
> (SUM the value). At some point, I would like to reset the state. I could
> drop the state by setting 'None' but I don't want to drop it. I would like
> to keep the state but update the state.
>
>
> For example:
>
> JavaPairDStream updatedResultsState =
> streamLogs.updateStateByKey(updateResultsStream);
>
> At some condition, I would like to update the state by key but with the
> different values, hence different update function.
>
>
> e.g.
>
>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>
> But the  newData.updateStateByKeyvalues cannot be replaced with the value
> in streamLogs.updateStateByKey. Do you know how I could replace the state
> value in  streamLogs with newData.
>
> Is this possible?
>
>
>
>
>
>


Re: Incremental load of RDD from HDFS?

2015-10-22 Thread Chris Spagnoli
Found the problem.  There was an rdd.distinct hiding in my code that I had
overlooked, and that caused this behavior (because instead of iterating over
the raw RDD, I was instead iterating over the RDD which had been derived
from it).

Thank you everyone!

- Chris



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Incremental-load-of-RDD-from-HDFS-tp25145p25166.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Running 2 spark application in parallel

2015-10-22 Thread Suman Somasundar
Hi all,

 

Is there a way to run 2 spark applications in parallel under Yarn in the same 
cluster?

 

Currently, if I submit 2 applications, one of them waits till the other one is 
completed.

 

I want both of them to start and run at the same time.

 

Thanks,
Suman.


Re: [Spark Streaming] How do we reset the updateStateByKey values.

2015-10-22 Thread Uthayan Suthakar
I need to take the value from a RDD and update the state of the other RDD.
Is this possible?

On 22 October 2015 at 16:06, Uthayan Suthakar 
wrote:

> Hello guys,
>
> I have a stream job that will carryout computations and update the state
> (SUM the value). At some point, I would like to reset the state. I could
> drop the state by setting 'None' but I don't want to drop it. I would like
> to keep the state but update the state.
>
>
> For example:
>
> JavaPairDStream updatedResultsState =
> streamLogs.updateStateByKey(updateResultsStream);
>
> At some condition, I would like to update the state by key but with the
> different values, hence different update function.
>
>
> e.g.
>
>  updatedResultsState = newData.updateStateByKey(resetResultsStream);
>
> But the  newData.updateStateByKeyvalues cannot be replaced with the value
> in streamLogs.updateStateByKey. Do you know how I could replace the state
> value in  streamLogs with newData.
>
> Is this possible?
>
>
>
>
>
>


Re: Maven Repository Hosting for Spark SQL 1.5.1

2015-10-22 Thread William Li
Thanks Deenar for your response. I am able to get the version 1.5.0 and other 
lower version, they all fine but just not the 1.5.1. It's hard to believe it's 
proxy settings settings.

What is interesting is that the Intellij does a few things when downloading 
this jar: putting into .m2 repository, creating pom etc. It was able to do that 
successfully for all other version but not 1.5.1

I tred to download the jar manually and run the maven install, it also errored 
out to say this packing is incomplete.


From: Deenar Toraskar 
>
Date: Thursday, October 22, 2015 at 10:36 AM
To: William Li >
Cc: "user@spark.apache.org" 
>
Subject: Re: Maven Repository Hosting for Spark SQL 1.5.1

I can see this artifact in public repos 
http://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.5.1

http://central.maven.org/maven2/org/apache/spark/spark-sql_2.10/1.5.1/spark-sql_2.10-1.5.1.jar

check your proxy settings or the list of repos you are using.

Deenar

On 22 October 2015 at 17:48, William Li 
> wrote:

Hi - I tried to download the Spark SQL 2.10 and version 1.5.1 from Intellij 
using the maven library:

-Project Structure
-Global Library, click on the + to select Maven Repository
-Type in org.apache.spark to see the list.
-The list result only shows version up to spark-sql_2.10-1.1.1
-I tried to manually typed in the version 1.5.1 but it doesn't download the 
correct list of files needed.

I can't see the 1.5.1 version. So there seems to be some problem because 
Intellij reported that org.apache.spark:spark-sql_2.10-1.5.1 is not available 
or not complete.

Does anyone know who to contact to verify that?


Thanks,

William.



Re: Spark StreamingStatefull information

2015-10-22 Thread Adrian Tanase
The result of updatestatebykey is a dstream that emits the entire state every 
batch - as an RDD - nothing special about it.

It easy to join / cogroup with another RDD if you have the correct keys in both.
You could load this one when the job starts and/or have it update with 
updatestatebykey as well, based on streaming updates from cassandra.

Sent from my iPhone

> On 22 Oct 2015, at 12:54, Arttii  wrote:
> 
> Hi,
> 
> So I am working on a usecase, where Clients are walking in and out of
> geofences and sendingmessages based on that.
> I currently have some in Memory Broadcast vars to do certain lookups for
> client and geofence info, some of this is also coming from Cassandra.
> My current quandry is that I need to support the case where a user comes in
> and out of geofence and also track how many messages have already been sent
> and do some logic based on that.
> 
> My stream is basically a bunch  of jsons
> {
> member:""
> beacon
> state:"exit","enter"
> }
> 
> 
> This information is invalidated at certain timesteps say messages a day and
> geofence every few minutes. Frist I thought if broadcast vars are good for
> this, but this gets updated a bunch so i do not think I can peridically
> rebroadcast these from the driver.
> 
> So I was thinking this might be a perfect case for UpdateStateByKey as I can
> kinda track what is going
> and also track the time inside the values and return Nones to "pop" things.
> 
> Currently I cannot wrap my head around on how to use this stream in
> conjuction with some other info that is coming in "Dstreams" "Rdds". All the
> example for UpdateStatebyKey are basically doing something to a stream
> updateStateBykey and then foreaching over it and persisting in a store. I
> dont think writing and reading from cassandra on every batch to get this
> info is a good idea, because I might get stale info.
> 
> Is this a valid case or am I missing the point and usecase of this function?
> 
> Thanks,
> Artyom
> 
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-StreamingStatefull-information-tp25160.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Get the previous state string

2015-10-22 Thread Akhil Das
That way, you will eventually end up bloating up that list. Instead, you
could push the stream to a noSQL database (like hbase or cassandra etc) and
then read it back and join it with your current stream if that's what you
are looking for.

Thanks
Best Regards

On Thu, Oct 15, 2015 at 6:11 PM, Yogesh Vyas  wrote:

> -- Forwarded message --
> From: Yogesh Vyas 
> Date: Thu, Oct 15, 2015 at 6:08 PM
> Subject: Get the previous state string
> To: user@spark.apache.org
>
>
> Hi,
> I am new to Spark and was trying to do some experiments with it.
>
> I had a JavaPairDStream RDD.
> I want to get the list of string from its previous state. For that I
> use updateStateByKey function as follows:
>
> final Function2,
> Optional> updateFunc =
>new Function2,
> Optional>() {
>
> public Optional call(List arg0,
> Optional arg1) throws Exception {
> // TODO Auto-generated method stub
> if(arg1.toString()==null)
>return Optional.of(arg0);
> else {
>arg0.add(arg1.toString());
>return Optional.of(arg0);
> }
>}
> };
>
> I want the function to append the new list of string to the previous
> list and return the new list. But I am not able to do so. I am getting
> the " java.lang.UnsupportedOperationException" error.
> Can anyone which help me out in getting the desired output?
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Analyzing consecutive elements

2015-10-22 Thread Dylan Hogg
Hi Sampo,

You could try zipWithIndex followed by a self join with shifted index
values like this:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)

Which produces the consecutive elements as pairs in the RDD for further
processing:
(0,((1,A),(3,B)))
(1,((3,B),(7,C)))
(2,((7,C),(8,D)))
(3,((8,D),(9,E)))

There are probably more efficient ways to do this, but if your dataset
isn't too big it should work for you.

Cheers,
Dylan.


On 22 October 2015 at 17:35, Sampo Niskanen 
wrote:

> Hi,
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
> How can this be achieved?
>
>
> *Sampo Niskanen*
>
> *Lead developer / Wellmo*
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>


Re: driver ClassNotFoundException when MySQL JDBC exceptions are thrown on executor

2015-10-22 Thread Akhil Das
Did you try passing the mysql connector jar through --driver-class-path

Thanks
Best Regards

On Sat, Oct 17, 2015 at 6:33 AM, Hurshal Patel  wrote:

> Hi all,
>
> I've been struggling with a particularly puzzling issue after upgrading to
> Spark 1.5.1 from Spark 1.4.1.
>
> When I use the MySQL JDBC connector and an exception (e.g.
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on
> the executor, I get a ClassNotFoundException on the driver, which results
> in this error (logs are abbreviated):
>
> 15/10/16 17:20:59 INFO SparkContext: Starting job: collect at
> repro.scala:73
> ...
> 15/10/16 17:20:59 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
> 15/10/16 17:20:59 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID
> 3)
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
> at repro.Repro$$anonfun$main$3.apply$mcZI$sp(repro.scala:69)
> ...
> 15/10/16 17:20:59 WARN ThrowableSerializationWrapper: Task exception could
> not be deserialized
> java.lang.ClassNotFoundException:
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
> ...
> 15/10/16 17:20:59 ERROR TaskResultGetter: Could not deserialize
> TaskEndReason: ClassNotFound with classloader
> org.apache.spark.util.MutableURLClassLoader@7f08a6b1
> 15/10/16 17:20:59 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3,
> localhost): UnknownReason
> 15/10/16 17:20:59 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1
> times; aborting job
> 15/10/16 17:20:59 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks
> have all completed, from pool
> 15/10/16 17:20:59 INFO TaskSchedulerImpl: Cancelling stage 3
> 15/10/16 17:20:59 INFO DAGScheduler: ResultStage 3 (collect at
> repro.scala:73) failed in 0.012 s
> 15/10/16 17:20:59 INFO DAGScheduler: Job 3 failed: collect at
> repro.scala:73, took 0.018694 s
>
>  In Spark 1.4.1, I get the following (logs are abbreviated):
> 15/10/16 17:42:41 INFO SparkContext: Starting job: collect at
> repro.scala:53
> ...
> 15/10/16 17:42:41 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
> 15/10/16 17:42:41 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID
> 2)
> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
> at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
> ...
> 15/10/16 17:42:41 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
> localhost): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
> at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
> ...
>
> 15/10/16 17:42:41 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1
> times; aborting job
> 15/10/16 17:42:41 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
> have all completed, from pool
> 15/10/16 17:42:41 INFO TaskSchedulerImpl: Cancelling stage 2
> 15/10/16 17:42:41 INFO DAGScheduler: ResultStage 2 (collect at
> repro.scala:53) failed in 0.016 s
> 15/10/16 17:42:41 INFO DAGScheduler: Job 2 failed: collect at
> repro.scala:53, took 0.024584 s
>
>
> I have seriously screwed up somewhere or this is a change in behavior that
> I have not been able to find in the documentation. For those that are
> interested, a full repro and logs follow.
>
> Hurshal
>
> ---
>
> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
> combinations of
>  * local/standalone mode
>  * putting mysql on the classpath with --jars/building a fat jar with
> mysql in it/manually running sc.addJar on the mysql jar
>  * --deploy-mode client/--deploy-mode cluster
> but nothing seems to change.
>
>
>
> Here is an example invocation, and the accompanying source code:
>
> $ ./bin/spark-submit --master local --deploy-mode client --class
> repro.Repro /home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar
> Using Spark's default log4j profile:
> org/apache/spark/log4j-defaults.properties
> 15/10/16 17:40:53 INFO SparkContext: Running Spark version 1.5.1
> 15/10/16 17:40:53 WARN NativeCodeLoader: Unable to load native-hadoop
> library for your platform... using builtin-java classes where applicable
> 15/10/16 17:40:53 WARN Utils: Your hostname, choochootrain resolves to a
> loopback address: 127.0.1.1; using 10.0.1.97 instead (on interface wlan0)
> 15/10/16 17:40:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
> another address
> 15/10/16 17:40:53 INFO SecurityManager: Changing view acls to: root
> 15/10/16 17:40:53 INFO SecurityManager: Changing modify acls to: root
> 15/10/16 17:40:53 INFO SecurityManager: SecurityManager: authentication
> disabled; ui acls disabled; users with view permissions: Set(root); users
> with modify permissions: Set(root)
> 15/10/16 17:40:54 INFO Slf4jLogger: Slf4jLogger started
> 15/10/16 17:40:54 INFO Remoting: Starting remoting
> 15/10/16 17:40:54 INFO Remoting: Remoting started; listening on addresses
> :[akka.tcp://sparkDriver@10.0.1.97:48116]
> 15/10/16 17:40:54 INFO Utils: Successfully started service 'sparkDriver'
> on port 48116.
> 

Re: Output println info in LogMessage Info ?

2015-10-22 Thread Akhil Das
Yes, using log4j you can log everything. Here's a thread with example
http://stackoverflow.com/questions/28454080/how-to-log-using-log4j-to-local-file-system-inside-a-spark-application-that-runs

Thanks
Best Regards

On Sun, Oct 18, 2015 at 12:10 AM, kali.tumm...@gmail.com <
kali.tumm...@gmail.com> wrote:

> Hi All,
>
> I n Unix I can print some warning or info using LogMessage WARN "Hi All" or
> LogMessage INFO "Hello World" is there similar thing in Spark ?
>
> Imagine I wan to print count of RDD in Logs instead of using Println
>
> Thanks
> Sri
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Output-println-info-in-LogMessage-Info-tp25107.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Analyzing consecutive elements

2015-10-22 Thread Sampo Niskanen
Hi,

Sorry, I'm not very familiar with those methods and cannot find the 'drop'
method anywhere.

As an example:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)
// ... then what?


Thanks.


Best regards,

*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


On Thu, Oct 22, 2015 at 10:43 AM, Adrian Tanase  wrote:

> I'm not sure if there is a better way to do it directly using Spark APIs
> but I would try to use mapPartitions and then within each partition
> Iterable to:
>
> rdd.zip(rdd.drop(1)) - using the Scala collection APIs
>
> This should give you what you need inside a partition. I'm hoping that you
> can partition your data somehow (e.g by user id or session id) that makes
> you algorithm parallel. In that case you can use the snippet above in a
> reduceByKey.
>
> hope this helps
> -adrian
>
> Sent from my iPhone
>
> On 22 Oct 2015, at 09:36, Sampo Niskanen 
> wrote:
>
> Hi,
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
> How can this be achieved?
>
>
> *Sampo Niskanen*
>
> *Lead developer / Wellmo *
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>
>


Re: Can I convert RDD[My_OWN_JAVA_CLASS] to DataFrame in Spark 1.3.x?

2015-10-22 Thread Akhil Das
Have a look at
http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection
if you haven't seen that already.

Thanks
Best Regards

On Thu, Oct 15, 2015 at 10:56 PM, java8964  wrote:

> Hi, Sparkers:
>
> I wonder if I can convert a RDD of my own Java class into DataFrame in
> Spark 1.3.
>
> Here is what I tried to archive, I want to load the data from Cassandra,
> and store them into HDFS using either AVRO or Parquet format. I want to
> test if I can do this in Spark.
>
> I am using Spark 1.3.1, with Cassandra Spark Connector 1.3. If I create a
> DataFrame directly using Cassandra Spark Connector 1.3, I have a problem
> to handle the UUID type in the Cassandra in the Spark.
>
> So I will try to create a RDD instead in the Cassandra Spark Connector
> 1.3, and save the data into a Java Object generated from the AVRO Schema,
> but I have problem to convert that RDD to DataFrame.
>
> If I use a case class, it works fine for me, as below:
>
> scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")
> rdd:
> com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
> = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
>
> scala>case class Output (id1: Long, id2: String)
> scala>val outputRdd = rdd.map(row => Output(row.getLong("id1"),
> row.getUUID("id2").toString))
> scala>import sqlContext.implicits._
> scala> val df = outputRdd.toDF
> outputDF: org.apache.spark.sql.DataFrame = [id1: bigint, id2: string]
>
> So the above code works fine for a simple case class.
>
> But the table in the Cassaandra is more complex that this, so I want to
> reuse a Java object which generated from an AVRO schema which matches with
> the Cassandra table.
>
> Let's say there is already a Java Class named "Coupon", which in fact is a
> Java class generated from the AVRO schema, but the following code not
> working:
>
> scala>val rdd = sc.cassandraTable("keyspace_name", "tableName")
> rdd:
> com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow]
> = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:15
>
> scala>case class Output (id1: Long, id2: String)
> scala>val outputRdd = rdd.map(row => new Coupon(row.getLong("id1",
> row.getUUID("id2").toString))
> outputRdd: org.apache.spark.rdd.RDD[Coupon] = MapPartitionsRDD[4] at map
> at :30
> scala>import sqlContext.implicits._
> scala> val df = outputRdd.toDF
> :32: error: value toDF is not a member of
> org.apache.spark.rdd.RDD[Coupon]
>val outputDF = outputRdd.toDF
>
> So my questions are:
>
> 1) Why a case class works above, but not a customize Java class? Does the
> toDF ONLY works with a Scala class?
> 2) I have to use DataFrame, as I want to output to Avro format, which only
> doable for DataFrame, not RDD. But I need the option to convert UUID with
> toString, so this type can work. What is my option?
> 3) I know that there is SQLContext.createDataframe method, but I only have
> AVRO schema, not a DataFrame schema. If I have to use this method, instead
> of toDF(), any easy way to get the DataFrame schema from an AVRO schema?
> 4) The real schema of "coupon" has quite some structs, and even nested
> structure, so I don't want to create a case class in this case. I want to
> reuse my AVRO class, can I do that?
>
> Thanks
>
> Yong
>


Re: spark streaming failing to replicate blocks

2015-10-22 Thread Akhil Das
Can you try fixing spark.blockManager.port to specific port and see if the
issue exists?

Thanks
Best Regards

On Mon, Oct 19, 2015 at 6:21 PM, Eugen Cepoi  wrote:

> Hi,
>
> I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
> The job is reading data from Kinesis and the batch size is of 30s (I used
> the same value for the kinesis checkpointing).
> In the executor logs I can see every 5 seconds a sequence of stacktraces
> indicating that the block replication failed. I am using the default
> storage level MEMORY_AND_DISK_SER_2.
> WAL is not enabled nor checkpointing (the checkpoint dir is configured for
> the spark context but not for the streaming context).
>
> Here is an example of those logs for ip-10-63-160-18. They occur in every
> executor while trying to replicate to any other executor.
>
>
> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
> [ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection to 
> ip-10-63-160-18.ec2.internal/10.63.160.18:50929
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
> message.
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying 
> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
> 15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection error on 
> connection to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
> 15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate 
> input-1-144524231 to BlockManagerId(3, ip-10-159-151-22.ec2.internal, 
> 50929), failure #0
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 INFO nio.ConnectionManager: Removing SendingConnection to 
> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
> [ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection to 
> ip-10-63-160-18.ec2.internal/10.63.160.18:39506
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:745)
> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
> message.
> java.net.ConnectException: Connection refused
>   at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>   at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>   at 
> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>   at 
> 

Re: multiple pyspark instances simultaneously (same time)

2015-10-22 Thread Akhil Das
Did you read
https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application

Thanks
Best Regards

On Thu, Oct 15, 2015 at 11:31 PM, jeff.sadow...@gmail.com <
jeff.sadow...@gmail.com> wrote:

> I am having issues trying to setup spark to run jobs simultaneously.
>
> I thought I wanted FAIR scheduling?
>
> I used the templated fairscheduler.xml as is when I start pyspark I see the
> 3 expected pools:
> production, test, and default
>
> when I login as second user and run pyspark
> I see the expected pools as that user as well
>
> when I open up a webbrowser to http://master:8080
>
> I see my first user's state is running and my second user's state is
> waiting
>
> so I try putting them both in the production pool which is fair scheduler
>
> When I refresh http://master:8080
>
> the second user's status is still waiting.
>
> If I try to run something as the second user I get
>
> "Initial job has not accepted any resources"
>
> Maybe fair queuing is not what I want?
>
> I'm starting pyspark as follows
>
> pyspark --master spark://master:7077
>
> I started spark as follows
>
> start-all.sh
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/multiple-pyspark-instances-simultaneously-same-time-tp25079.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Accessing external Kerberised resources from Spark executors in Yarn client/cluster mode

2015-10-22 Thread Deenar Toraskar
Hi All

I am trying to access a SQLServer that uses Kerberos for authentication
from Spark. I can successfully connect to the SQLServer from the driver
node, but any connections to SQLServer from executors fails with "Failed to
find any Kerberos tgt".

org.apache.hadoop.security.UserGroupInformation.getCurrentUser on the
driver returns *myPrincipal (auth:KERBEROS) *as expected. And the same call
on executors returns

sc.parallelize(0 to 10).map { _ =>(("hostname".!!).trim,
UserGroupInformation.getCurrentUser.toString)}.collect.distinct

returns

Array((hostname1, myprincipal (auth:SIMPLE), (hostname2, myprincipal
(auth:SIMPLE))


I tried passing the keytab and logging in explicitly from the executors,
but that didnt help either.

sc.parallelize(0 to 10).map { _
=>(SparkHadoopUtil.get.loginUserFromKeytab("myprincipal",SparkFiles.get("myprincipal.keytab")),
("hostname".!!).trim,
UserGroupInformation.getCurrentUser.toString)}.collect.distinct

Digging deeper I found SPARK-6207 and came across code for each Kerberised
service that is accessed from the executors in Yarn Client, such as

obtainTokensForNamenodes(nns, hadoopConf, credentials)
obtainTokenForHiveMetastore(hadoopConf,
credentials)

I was wondering if anyone has been successful in accessing external
resources (running external to the Hadoop cluster) secured by Kerberos in
Spark executors running in Yarn.



Regards
Deenar


On 20 April 2015 at 21:58, Andrew Lee  wrote:

> Hi All,
>
> Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1
>
> Posting this problem to user group first to see if someone is encountering
> the same problem.
>
> When submitting spark jobs that invokes HiveContext APIs on a Kerberos
> Hadoop + YARN (2.4.1) cluster,
> I'm getting this error.
>
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
>
> Apparently, the Kerberos ticket is not on the remote data node nor
> computing node since we don't
> deploy Kerberos tickets, and that is not a good practice either. On the
> other hand, we can't just SSH to every machine and run kinit for that
> users. This is not practical and it is insecure.
>
> The point here is that shouldn't there be a delegation token during the
> doAs to use the token instead of the ticket ?
> I'm trying to understand what is missing in Spark's HiveContext API while
> a normal MapReduce job that invokes Hive APIs will work, but not in Spark
> SQL. Any insights or feedback are appreciated.
>
> Anyone got this running without pre-deploying (pre-initializing) all
> tickets node by node? Is this worth filing a JIRA?
>
>
>
> 15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with
> URI thrift://alee-cluster.test.testserver.com:9083
> 15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure
> javax.security.sasl.SaslException: GSS initiate failed [Caused by
> GSSException: No valid credentials provided (Mechanism level: Failed to
> find any Kerberos tgt)]
> at
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
> at
> org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
> at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
> at
> org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:415)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
> at
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336)
> at
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:214)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at
> org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1410)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:62)
> at
> org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:72)
> at
> org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:2453)
> at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:2465)
> at
> 

Re: Storing Compressed data in HDFS into Spark

2015-10-22 Thread Akhil Das
Convert your data to parquet, it saves space and time.

Thanks
Best Regards

On Mon, Oct 19, 2015 at 11:43 PM, ahaider3  wrote:

> Hi,
> A lot of the data I have in HDFS is compressed. I noticed when I load this
> data into spark and cache it, Spark unrolls the data like normal but stores
> the data uncompressed in memory. For example, suppose /data/ is an RDD with
> compressed partitions on HDFS. I then cache the data. When I call
> /data.count()/, the data is rightly decompressed since it needs to find the
> value of /.count()/. But, the data that is cached is also decompressed. Can
> a partition be compressed in spark? I know spark allows for data to be
> compressed, after serialization. But what if, I only want the partitions
> compressed.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Storing-Compressed-data-in-HDFS-into-Spark-tp25123.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Storing Compressed data in HDFS into Spark

2015-10-22 Thread Igor Berman
check spark.rdd.compress

On 19 October 2015 at 21:13, ahaider3  wrote:

> Hi,
> A lot of the data I have in HDFS is compressed. I noticed when I load this
> data into spark and cache it, Spark unrolls the data like normal but stores
> the data uncompressed in memory. For example, suppose /data/ is an RDD with
> compressed partitions on HDFS. I then cache the data. When I call
> /data.count()/, the data is rightly decompressed since it needs to find the
> value of /.count()/. But, the data that is cached is also decompressed. Can
> a partition be compressed in spark? I know spark allows for data to be
> compressed, after serialization. But what if, I only want the partitions
> compressed.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Storing-Compressed-data-in-HDFS-into-Spark-tp25123.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: java TwitterUtils.createStream() how create "user stream" ???

2015-10-22 Thread Akhil Das
I don't think the one that comes with spark would listen to specific user
feeds, but yes you can filter out the public tweets by passing the filters
argument. Here's an example for you to start
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala#L37

If you are really interested in specific user stream, then you would have
to probably create a custom receiver and add your logic to that (to fetch
only those tweets etc), you can read this doc to get started on that
http://spark.apache.org/docs/latest/streaming-custom-receivers.html

Thanks
Best Regards

On Tue, Oct 20, 2015 at 4:40 AM, Andy Davidson <
a...@santacruzintegration.com> wrote:

> Hi
>
> I wrote a little prototype that created a “public stream” now I want to
> convert it to read tweets for a large number of explicit users.
>
> I to create a “user stream” or a “site stream". According to the twitter
> developer doc I should be able to set the “follows” parameter to a list of
> users I am interested in
>
> https://dev.twitter.com/streaming/overview/request-parameters#follow
> *follow*
>
> *A comma-separated list of user IDs, indicating the users whose Tweets
> should be delivered on the stream. *
>
>
> I am not sure how to do this? I found the doc for createStream. I am
> guessing I need to set filters? Can anyone provide a example?
>
> Kind regards
>
> Andy
>
> http://spark.apache.org/docs/latest/api/java/index.html
>
>
> createStream
>
> public static JavaReceiverInputDStream 
> 
>  createStream(JavaStreamingContext 
> 
>  jssc,
>   java.lang.String[] 
> filters)
>
> Create a input stream that returns tweets received from Twitter using
> Twitter4J's default OAuth authentication; this requires the system
> properties twitter4j.oauth.consumerKey, twitter4j.oauth.consumerSecret,
> twitter4j.oauth.accessToken and twitter4j.oauth.accessTokenSecret. Storage
> level of the data will be the default StorageLevel.MEMORY_AND_DISK_SER_2.
> Parameters:jssc - JavaStreamingContext objectfilters - Set of filter
> strings to get only those tweets that match themReturns:(undocumented)
>
>


Re: Maven Repository Hosting for Spark SQL 1.5.1

2015-10-22 Thread Sean Owen
Maven, in general, does some local caching to avoid htting the repo
every time. It's possible this is why you're not seeing 1.5.1. On the
command line you can for example add "mvn -U ..." Not sure of the
equivalent in IntelliJ, but it will be updating the same repo IJ sees.
Try that. The repo definitely has 1.5.1 as you can see.

On Thu, Oct 22, 2015 at 11:44 AM, William Li  wrote:
> Thanks Deenar for your response. I am able to get the version 1.5.0 and
> other lower version, they all fine but just not the 1.5.1. It’s hard to
> believe it’s proxy settings settings.
>
> What is interesting is that the Intellij does a few things when downloading
> this jar: putting into .m2 repository, creating pom etc. It was able to do
> that successfully for all other version but not 1.5.1
>
> I tred to download the jar manually and run the maven install, it also
> errored out to say this packing is incomplete.
>
>
> From: Deenar Toraskar 
> Date: Thursday, October 22, 2015 at 10:36 AM
> To: William Li 
> Cc: "user@spark.apache.org" 
> Subject: Re: Maven Repository Hosting for Spark SQL 1.5.1
>
> I can see this artifact in public repos
> http://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.5.1
>
> http://central.maven.org/maven2/org/apache/spark/spark-sql_2.10/1.5.1/spark-sql_2.10-1.5.1.jar
>
> check your proxy settings or the list of repos you are using.
>
> Deenar
>
> On 22 October 2015 at 17:48, William Li  wrote:
>>
>>
>> Hi – I tried to download the Spark SQL 2.10 and version 1.5.1 from
>> Intellij using the maven library:
>>
>> -Project Structure
>> -Global Library, click on the + to select Maven Repository
>> -Type in org.apache.spark to see the list.
>> -The list result only shows version up to spark-sql_2.10-1.1.1
>> -I tried to manually typed in the version 1.5.1 but it doesn’t download
>> the correct list of files needed.
>>
>> I can’t see the 1.5.1 version. So there seems to be some problem because
>> Intellij reported that org.apache.spark:spark-sql_2.10-1.5.1 is not
>> available or not complete.
>>
>> Does anyone know who to contact to verify that?
>>
>>
>> Thanks,
>>
>> William.
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: [Spark Streaming] Design Patterns forEachRDD

2015-10-22 Thread Nipun Arora
Hi Sandip,

Thanks for your response..

I am not sure if this is the same thing.
I am looking for a way to connect to external network as shown in the
example.

@All - Can anyone else let me know if they have a better solution?

Thanks
Nipun

On Wed, Oct 21, 2015 at 2:07 PM, Sandip Mehta 
wrote:

> Does this help ?
>
> final JavaHBaseContext hbaseContext = new JavaHBaseContext(
> javaSparkContext, conf);
> customerModels.foreachRDD(new Function() {
>   private static final long serialVersionUID = 1L;
>   @Override
>   public Void call(JavaRDD currentRDD) throws Exception {
> JavaRDD customerWithPromotion = hbaseContext
> .mapPartition(currentRDD, new PromotionLookupFunction());
> customerWithPromotion.persist(StorageLevel.MEMORY_AND_DISK_SER());
> customerWithPromotion.foreachPartition();
>   }
> });
>
>
> On 21-Oct-2015, at 10:55 AM, Nipun Arora  wrote:
>
> Hi All,
>
> Can anyone provide a design pattern for the following code shown in the
> Spark User Manual, in JAVA ? I have the same exact use-case, and for some
> reason the design pattern for Java is missing.
>
>  Scala version taken from :
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> dstream.foreachRDD { rdd =>
>   rdd.foreachPartition { partitionOfRecords =>
> val connection = createNewConnection()
> partitionOfRecords.foreach(record => connection.send(record))
> connection.close()
>   }}
>
>
> I have googled for it and haven't really found a solution. This seems to
> be an important piece of information, especially for people who need to
> ship their code necessarily in Java because of constraints in the company
> (like me) :)
>
> I'd really appreciate any help
>
> Thanks
> Nipun
>
>
>


Getting ClassNotFoundException: scala.Some on Spark 1.5.x

2015-10-22 Thread Babar Tareen
Hi,

I am getting following exception when submitting a job to Spark 1.5.x from
Scala. The same code works with Spark 1.4.1. Any clues as to what might
causing the exception.



*Code:App.scala*import org.apache.spark.SparkContext

object App {
  def main(args: Array[String]) = {
val l = List(1,2,3,4,5,6,7,8,9,0)
val sc = new SparkContext("local[4]", "soark-test")
val rdd = sc.parallelize(l)
rdd.foreach(println)
println(rdd.collect())
  }
}

*build.sbt*
lazy val sparkjob = (project in file("."))
  .settings(
name := "SparkJob",
version := "1.0",
scalaVersion := "2.11.6",
libraryDependencies := libs
)

lazy val libs = Seq(
  "org.apache.spark" %% "spark-core" % "1.5.1"
)


*Exception:*15/10/22 14:32:42 INFO DAGScheduler: Job 0 failed: foreach at
app.scala:9, took 0.689832 s
[error] (run-main-0) org.apache.spark.SparkException: Job aborted due to
stage failure: Task 2 in stage 0.0 failed 1 times, most recent failure:
Lost task 2.0 in stage 0.0 (TID 2, localhost): java.io.IOException:
java.lang.ClassNotFoundException: scala.Some
[error] at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
[error] at
org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
[error] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[error] at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
[error] at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
[error] at java.lang.reflect.Method.invoke(Method.java:497)
[error] at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
[error] at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.skipCustomData(ObjectInputStream.java:1959)
[error] at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
[error] at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
[error] at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:72)
[error] at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:98)
[error] at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:194)
[error] at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
[error] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
[error] at java.lang.Thread.run(Thread.java:745)
[error] Caused by: java.lang.ClassNotFoundException: scala.Some
[error] at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
[error] at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
[error] at java.lang.Class.forName0(Native Method)
[error] at java.lang.Class.forName(Class.java:348)
[error] at
org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:67)
[error] at
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1613)
[error] at
java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1518)
[error] at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1774)
[error] at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
[error] at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
[error] at
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:501)
[error] at
org.apache.spark.Accumulable$$anonfun$readObject$1.apply$mcV$sp(Accumulators.scala:152)
[error] at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1160)
[error] ... 24 more
[error]
[error] Driver stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 2
in stage 0.0 failed 1 times, most recent failure: Lost task 2.0 in stage
0.0 (TID 2, localhost): java.io.IOException:
java.lang.ClassNotFoundException: scala.Some
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1163)
at org.apache.spark.Accumulable.readObject(Accumulators.scala:151)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at

Re: Save RandomForest Model from ML package

2015-10-22 Thread Sujit Pal
Hi Sebastian,

You can save models to disk and load them back up. In the snippet below
(copied out of a working Databricks notebook), I train a model, then save
it to disk, then retrieve it back into model2 from disk.

import org.apache.spark.mllib.tree.RandomForest
> import org.apache.spark.mllib.tree.model.RandomForestModel
>


val model = RandomForest.trainClassifier(data, numClasses,
> categoricalFeaturesInfo,
> numTrees, featureSubsetStrategy, impurity, maxDepth, maxBins, seed)
> model.save(sc, inputDir + "models/randomForestModel")
>


val model2 = RandomForestModel.load(sc, inputDir +
> "models/randomForestModel")


Not sure if there is PMML support. The model saves itself into a directory
structure that looks like this:

data/
>   _SUCCESS
>   _common_metadata
>   _metadata
>   part-r-*.gz.parquet (multiple files)
> metadata/
>   _SUCCESS
>   part-0


HTH

-sujit




On Thu, Oct 22, 2015 at 5:33 AM, Sebastian Kuepers <
sebastian.kuep...@publicispixelpark.de> wrote:

> Hey,
>
> I try to figure out the best practice on saving and loading models which
> have bin fitted with the ML package - i.e. with the RandomForest
> classifier.
>
> There is PMML support in the MLib package afaik but not in ML - is that
> correct?
>
> How do you approach this, so that you do not have to fit your model before
> every prediction job?
>
> Thanks,
> Sebastian
>
>
> Sebastian Küpers
> Account Director
>
> Publicis Pixelpark
> Leibnizstrasse 65, 10629 Berlin
> T +49 30 5058 1838
> M +49 172 389 28 52
> sebastian.kuep...@publicispixelpark.de
> Web: publicispixelpark.de, Twitter: @pubpxp
> Facebook: publicispixelpark.de/facebook
> Publicis Pixelpark - eine Marke der Pixelpark AG
> Vorstand: Horst Wagner (Vorsitzender), Dirk Kedrowitsch
> Aufsichtsratsvorsitzender: Pedro Simko
> Amtsgericht Charlottenburg: HRB 72163
>
>
>
>
>
> 
> Disclaimer The information in this email and any attachments may contain
> proprietary and confidential information that is intended for the
> addressee(s) only. If you are not the intended recipient, you are hereby
> notified that any disclosure, copying, distribution, retention or use of
> the contents of this information is prohibited. When addressed to our
> clients or vendors, any information contained in this e-mail or any
> attachments is subject to the terms and conditions in any governing
> contract. If you have received this e-mail in error, please immediately
> contact the sender and delete the e-mail.
>


Spark YARN Shuffle service wire compatibility

2015-10-22 Thread Jong Wook Kim
Hi, I’d like to know if there is a guarantee that Spark YARN shuffle service 
has wire compatibility between 1.x versions.

I could run Spark 1.5 job with YARN nodemanagers having shuffle service 1.4, 
but it might’ve been just a coincidence.

Now we’re upgrading CDH to 5.3 to 5.4, whose NodeManager already have shuffle 
service of 1.3 in its classpath, and concerned that it might not be always 
compatible with 1.5 jobs expecting shuffle service.


Jong Wook
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



[SPARK STREAMING] polling based operation instead of event based operation

2015-10-22 Thread Nipun Arora
Hi,
In general in spark stream one can do transformations ( filter, map etc.)
or output operations (collect, forEach) etc. in an event-driven pardigm...
i.e. the action happens only if a message is received.

Is it possible to do actions every few seconds in a polling based fashion,
regardless if a new message has been received.

In my use-case, I am filtering out a stream, and then do operations on this
filter streams. However, I would like to do operations on the data in the
stream every few seconds even if no message has been received in the stream.

For example I have the following stream, where the violationChecking()
function is being called only when a micro-batch is finished. Essentially
this also means that I must receive a message in this stream to do
processing. Is there any way that I can do the same operation every 10
seconds or so? :

sortedMessages.foreach(
new Function>, Void>() {
@Override
public Void call(JavaRDD> tuple5JavaRDD) throws Exception {
List>
list = tuple5JavaRDD.collect();
violationChecking(list);
return null;
}
}
);


Thanks,

Nipun


Re: Spark issue running jar on Linux vs Windows

2015-10-22 Thread Ted Yu
RemoteActorRefProvider is in akka-remote_2.10-2.3.11.jar

jar tvf
~/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.11/akka-remote_2.10-2.3.11.jar
| grep RemoteActorRefProvi
  1761 Fri May 08 16:13:02 PDT 2015
akka/remote/RemoteActorRefProvider$$anonfun$5.class
  1416 Fri May 08 16:13:02 PDT 2015
akka/remote/RemoteActorRefProvider$$anonfun$6.class

Is the above jar on your classpath ?

Cheers

On Thu, Oct 22, 2015 at 4:39 PM, Michael Lewis  wrote:

> Hi,
>
> I have a Spark driver process that I have built into a single ‘fat jar’
> this runs fine,  in Cygwin, on my development machine,
> I can run:
>
> scala -cp my-fat-jar-1.0.0.jar com.foo.MyMainClass
>
> this works fine, it will submit Spark job, they process, all good.
>
>
> However, on Linux (all Jars Spark(1.4) and Scala version (2.10.5) being
> the same), I get this error:
>
> 18:59:14.358 [Curator-QueueBuilder-2] ERROR
> o.a.c.f.r.queue.DistributedQueue - Exception processing queue item:
> queue-00
> java.lang.NoSuchMethodException:
> akka.remote.RemoteActorRefProvider.(java.lang.String,
> akka.actor.ActorSystem$Settings, akka.event.EventStream,
> akka.actor.Scheduler, akka.act
> at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_60]
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> ~[na:1.8.0_60]
>
>
> i.e. No such method exception.  Can anyone suggest how I can fix this?
>
> I’ve tried changing the scala to java and putting scale-lang on the class
> path, but this just generates new errors about missing akka configuration.
>
> Given the identical jars and scala version - I’m not sure why I’m getting
> this error running driver on Linux.
>
> Appreciate any help/pointers.
>
>
> Thanks,
> Mike Lewis
>


Re: Spark issue running jar on Linux vs Windows

2015-10-22 Thread Ted Yu
RemoteActorRefProvider is in akka-remote_2.10-2.3.11.jar

jar tvf
~/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.11/akka-remote_2.10-2.3.11.jar
| grep RemoteActorRefProvi
  1761 Fri May 08 16:13:02 PDT 2015
akka/remote/RemoteActorRefProvider$$anonfun$5.class
  1416 Fri May 08 16:13:02 PDT 2015
akka/remote/RemoteActorRefProvider$$anonfun$6.class

Is the above jar on your classpath ?

Cheers

On Thu, Oct 22, 2015 at 4:39 PM, Michael Lewis  wrote:

> Hi,
>
> I have a Spark driver process that I have built into a single ‘fat jar’
> this runs fine,  in Cygwin, on my development machine,
> I can run:
>
> scala -cp my-fat-jar-1.0.0.jar com.foo.MyMainClass
>
> this works fine, it will submit Spark job, they process, all good.
>
>
> However, on Linux (all Jars Spark(1.4) and Scala version (2.10.5) being
> the same), I get this error:
>
> 18:59:14.358 [Curator-QueueBuilder-2] ERROR
> o.a.c.f.r.queue.DistributedQueue - Exception processing queue item:
> queue-00
> java.lang.NoSuchMethodException:
> akka.remote.RemoteActorRefProvider.(java.lang.String,
> akka.actor.ActorSystem$Settings, akka.event.EventStream,
> akka.actor.Scheduler, akka.act
> at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_60]
> at java.lang.Class.getDeclaredConstructor(Class.java:2178)
> ~[na:1.8.0_60]
>
>
> i.e. No such method exception.  Can anyone suggest how I can fix this?
>
> I’ve tried changing the scala to java and putting scale-lang on the class
> path, but this just generates new errors about missing akka configuration.
>
> Given the identical jars and scala version - I’m not sure why I’m getting
> this error running driver on Linux.
>
> Appreciate any help/pointers.
>
>
> Thanks,
> Mike Lewis
>


Spark issue running jar on Linux vs Windows

2015-10-22 Thread Michael Lewis
Hi,

I have a Spark driver process that I have built into a single ‘fat jar’ this 
runs fine,  in Cygwin, on my development machine,
I can run:
 
scala -cp my-fat-jar-1.0.0.jar com.foo.MyMainClass

this works fine, it will submit Spark job, they process, all good.


However, on Linux (all Jars Spark(1.4) and Scala version (2.10.5) being the 
same), I get this error:

18:59:14.358 [Curator-QueueBuilder-2] ERROR o.a.c.f.r.queue.DistributedQueue - 
Exception processing queue item: queue-00
java.lang.NoSuchMethodException: 
akka.remote.RemoteActorRefProvider.(java.lang.String, 
akka.actor.ActorSystem$Settings, akka.event.EventStream, akka.actor.Scheduler, 
akka.act
at java.lang.Class.getConstructor0(Class.java:3082) ~[na:1.8.0_60]
at java.lang.Class.getDeclaredConstructor(Class.java:2178) 
~[na:1.8.0_60]
 
i.e. No such method exception.  Can anyone suggest how I can fix this?

I’ve tried changing the scala to java and putting scale-lang on the class path, 
but this just generates new errors about missing akka configuration.

Given the identical jars and scala version - I’m not sure why I’m getting this 
error running driver on Linux.

Appreciate any help/pointers.


Thanks,
Mike Lewis

Re: Maven Repository Hosting for Spark SQL 1.5.1

2015-10-22 Thread William Li
Thanks Sean. I did that but didn¹t seem to help. However, I manually
downloaded both the pom and jar files from the site, and then run through
mvn dependency:purge-local-repository to clean up the local repo (+ re
download them all). All are good and then the error went away.

Thanks a lot for your help!

William.

On 10/22/15, 1:44 PM, "Sean Owen"  wrote:

>Maven, in general, does some local caching to avoid htting the repo
>every time. It's possible this is why you're not seeing 1.5.1. On the
>command line you can for example add "mvn -U ..." Not sure of the
>equivalent in IntelliJ, but it will be updating the same repo IJ sees.
>Try that. The repo definitely has 1.5.1 as you can see.
>
>On Thu, Oct 22, 2015 at 11:44 AM, William Li  wrote:
>> Thanks Deenar for your response. I am able to get the version 1.5.0 and
>> other lower version, they all fine but just not the 1.5.1. It¹s hard to
>> believe it¹s proxy settings settings.
>>
>> What is interesting is that the Intellij does a few things when
>>downloading
>> this jar: putting into .m2 repository, creating pom etc. It was able to
>>do
>> that successfully for all other version but not 1.5.1
>>
>> I tred to download the jar manually and run the maven install, it also
>> errored out to say this packing is incomplete.
>>
>>
>> From: Deenar Toraskar 
>> Date: Thursday, October 22, 2015 at 10:36 AM
>> To: William Li 
>> Cc: "user@spark.apache.org" 
>> Subject: Re: Maven Repository Hosting for Spark SQL 1.5.1
>>
>> I can see this artifact in public repos
>> http://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10/1.5.1
>>
>> 
>>http://central.maven.org/maven2/org/apache/spark/spark-sql_2.10/1.5.1/spa
>>rk-sql_2.10-1.5.1.jar
>>
>> check your proxy settings or the list of repos you are using.
>>
>> Deenar
>>
>> On 22 October 2015 at 17:48, William Li  wrote:
>>>
>>>
>>> Hi ­ I tried to download the Spark SQL 2.10 and version 1.5.1 from
>>> Intellij using the maven library:
>>>
>>> -Project Structure
>>> -Global Library, click on the + to select Maven Repository
>>> -Type in org.apache.spark to see the list.
>>> -The list result only shows version up to spark-sql_2.10-1.1.1
>>> -I tried to manually typed in the version 1.5.1 but it doesn¹t download
>>> the correct list of files needed.
>>>
>>> I can¹t see the 1.5.1 version. So there seems to be some problem
>>>because
>>> Intellij reported that org.apache.spark:spark-sql_2.10-1.5.1 is not
>>> available or not complete.
>>>
>>> Does anyone know who to contact to verify that?
>>>
>>>
>>> Thanks,
>>>
>>> William.
>>
>>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Best way to use Spark UDFs via Hive (Spark Thrift Server)

2015-10-22 Thread Dave Moyers
Hi,

We have several udf's written in Scala that we use within jobs submitted into 
Spark. They work perfectly with the sqlContext after being registered. We also 
allow access to saved tables via the Hive Thrift server bundled with Spark. 
However, we would like to allow Hive connections to use the udf's in their 
queries against the saved tables. Is there a way to register udf's such that 
they can be used within both a Spark job and in a Hive connection?

Thanks!
Dave

Sent from my iPad
-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Spark StreamingStatefull information

2015-10-22 Thread Arttii
Hi,

So I am working on a usecase, where Clients are walking in and out of
geofences and sendingmessages based on that.
I currently have some in Memory Broadcast vars to do certain lookups for
client and geofence info, some of this is also coming from Cassandra.
My current quandry is that I need to support the case where a user comes in
and out of geofence and also track how many messages have already been sent
and do some logic based on that.

My stream is basically a bunch  of jsons
{
member:""
beacon
state:"exit","enter"
}


This information is invalidated at certain timesteps say messages a day and
geofence every few minutes. Frist I thought if broadcast vars are good for
this, but this gets updated a bunch so i do not think I can peridically
rebroadcast these from the driver.

So I was thinking this might be a perfect case for UpdateStateByKey as I can
kinda track what is going
and also track the time inside the values and return Nones to "pop" things.

Currently I cannot wrap my head around on how to use this stream in
conjuction with some other info that is coming in "Dstreams" "Rdds". All the
example for UpdateStatebyKey are basically doing something to a stream
updateStateBykey and then foreaching over it and persisting in a store. I
dont think writing and reading from cassandra on every batch to get this
info is a good idea, because I might get stale info.

Is this a valid case or am I missing the point and usecase of this function?

Thanks,
Artyom







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-StreamingStatefull-information-tp25160.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Accessing external Kerberised resources from Spark executors in Yarn client/cluster mode

2015-10-22 Thread Doug Balog
Another thing to check is to make sure each one of you executor nodes has the 
JCE jars installed.

try{ javax.crypto.Cipher.getMaxAllowedKeyLength("AES") > 128 } catch { case  
e:java.security.NoSuchAlgorithmException => false }

Setting  "-Dsun.security.krb5.debug=true” and “-Dsun.security.jgss.debug=true”  
in spark.executor.extraJavaOptions
and running loginUserFromKeytab() will generate a lot of info in the executor 
logs, which might be helpful to figure out what is going on too.

Cheers,

Doug


> On Oct 22, 2015, at 7:59 AM, Deenar Toraskar  
> wrote:
> 
> Hi All
> 
> I am trying to access a SQLServer that uses Kerberos for authentication from 
> Spark. I can successfully connect to the SQLServer from the driver node, but 
> any connections to SQLServer from executors fails with "Failed to find any 
> Kerberos tgt". 
> 
> org.apache.hadoop.security.UserGroupInformation.getCurrentUser on the driver 
> returns myPrincipal (auth:KERBEROS) as expected. And the same call on 
> executors returns
> 
> sc.parallelize(0 to 10).map { _ =>(("hostname".!!).trim, 
> UserGroupInformation.getCurrentUser.toString)}.collect.distinct 
> 
> returns
> 
> Array((hostname1, myprincipal (auth:SIMPLE), (hostname2, myprincipal 
> (auth:SIMPLE))
> 
> 
> I tried passing the keytab and logging in explicitly from the executors, but 
> that didnt help either.
> 
> sc.parallelize(0 to 10).map { _ 
> =>(SparkHadoopUtil.get.loginUserFromKeytab("myprincipal",SparkFiles.get("myprincipal.keytab")),
>  ("hostname".!!).trim, 
> UserGroupInformation.getCurrentUser.toString)}.collect.distinct
> 
> Digging deeper I found SPARK-6207 and came across code for each Kerberised 
> service that is accessed from the executors in Yarn Client, such as
> 
> obtainTokensForNamenodes(nns, hadoopConf, credentials)
> 
> obtainTokenForHiveMetastore(hadoopConf, credentials)
> 
> I was wondering if anyone has been successful in accessing external resources 
> (running external to the Hadoop cluster) secured by Kerberos in Spark 
> executors running in Yarn. 
> 
> 
> 
> Regards
> Deenar
> 
> 
> On 20 April 2015 at 21:58, Andrew Lee  wrote:
> Hi All,
> 
> Affected version: spark 1.2.1 / 1.2.2 / 1.3-rc1
> 
> Posting this problem to user group first to see if someone is encountering 
> the same problem. 
> 
> When submitting spark jobs that invokes HiveContext APIs on a Kerberos Hadoop 
> + YARN (2.4.1) cluster, 
> I'm getting this error. 
> 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
> 
> Apparently, the Kerberos ticket is not on the remote data node nor computing 
> node since we don't 
> deploy Kerberos tickets, and that is not a good practice either. On the other 
> hand, we can't just SSH to every machine and run kinit for that users. This 
> is not practical and it is insecure.
> 
> The point here is that shouldn't there be a delegation token during the doAs 
> to use the token instead of the ticket ? 
> I'm trying to understand what is missing in Spark's HiveContext API while a 
> normal MapReduce job that invokes Hive APIs will work, but not in Spark SQL. 
> Any insights or feedback are appreciated.
> 
> Anyone got this running without pre-deploying (pre-initializing) all tickets 
> node by node? Is this worth filing a JIRA?
> 
> 
> 
> 15/03/25 18:59:08 INFO hive.metastore: Trying to connect to metastore with 
> URI thrift://alee-cluster.test.testserver.com:9083
> 15/03/25 18:59:08 ERROR transport.TSaslTransport: SASL negotiation failure
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Failed to find 
> any Kerberos tgt)]
>   at 
> com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
>   at 
> org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94)
>   at 
> org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:253)
>   at 
> org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
>   at 
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:52)
>   at 
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:49)
>   at java.security.AccessController.doPrivileged(Native Method)
>   at javax.security.auth.Subject.doAs(Subject.java:415)
>   at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1556)
>   at 
> org.apache.hadoop.hive.thrift.client.TUGIAssumingTransport.open(TUGIAssumingTransport.java:49)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.open(HiveMetaStoreClient.java:336)
>   at 
> org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:214)
>   at 

Re: Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Cody Koeninger
That sounds like a networking issue to me.  Stuff to try
- make sure every executor node can talk to every kafka broker on relevant
ports
- look at firewalls / network  config.  Even if you can make the initial
connection, something may be happening after a while (we've seen ...
"interesting"... issues with aws networking for instance)
- look at kafka error logs
- look at lsof or even tcpdump to see whats happening with the relevant
ports when this occurs



On Thu, Oct 22, 2015 at 9:00 AM, Conor Fennell 
wrote:

> Hi,
>
> Firstly want to say a big thanks to Cody for contributing the kafka
> direct stream.
>
> I have been using the receiver based approach for months but the
> direct stream is a much better solution for my use case.
>
> The job in question is now ported over to the direct stream doing
> idempotent outputs to Cassandra and outputting to kafka.
> I am also saving the offsets to Cassandra.
>
> But unfortunately I am sporadically getting the error below.
> It recovers and continues but gives a large spike in the processing
> delay. And it can happen in every 3 or 4 batches.
> I still have other receiver jobs running and they never throw these
> exceptions.
>
> I would be very appreciative for any direction and I can happily
> provide more detail.
>
> Thanks,
> Conor
>
> 15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
> found, computing it
> 15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
> partition 0 offsets 13630747 -> 13633001
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
> is overridden to
> 15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
> zookeeper.connect is overridden to
> 15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
> socket error: java.nio.channels.ClosedChannelException
> 15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
> stage 654.0 (TID 5242)
> java.nio.channels.ClosedChannelException
> at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
> at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
> at
> kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at
> kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
> at
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
> at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
> at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
> at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
> at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> at org.apache.spark.scheduler.Task.run(Task.scala:88)
> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
> assigned task 5243
> 15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
> 654.0 (TID 5243)
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional 

Re: Storing Compressed data in HDFS into Spark

2015-10-22 Thread Adnan Haider
I believe spark.rdd.compress requires the data to be serialized. In my
case, I have data already compressed which becomes decompressed as I try to
cache it. I believe even when I set spark.rdd.compress to *true, *Spark
will still decompress the data and then serialize it and then compress the
serialized data.

Although Parquet is an option, I believe it will only make sense to use it
when running Spark SQL. However, if I am using graphx or mllib will it
help?

Thanks, Adnan Haider
B.S Candidate, Computer Science
Illinois Institute of Technology

On Thu, Oct 22, 2015 at 7:15 AM, Igor Berman  wrote:

> check spark.rdd.compress
>
> On 19 October 2015 at 21:13, ahaider3  wrote:
>
>> Hi,
>> A lot of the data I have in HDFS is compressed. I noticed when I load this
>> data into spark and cache it, Spark unrolls the data like normal but
>> stores
>> the data uncompressed in memory. For example, suppose /data/ is an RDD
>> with
>> compressed partitions on HDFS. I then cache the data. When I call
>> /data.count()/, the data is rightly decompressed since it needs to find
>> the
>> value of /.count()/. But, the data that is cached is also decompressed.
>> Can
>> a partition be compressed in spark? I know spark allows for data to be
>> compressed, after serialization. But what if, I only want the partitions
>> compressed.
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Storing-Compressed-data-in-HDFS-into-Spark-tp25123.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Error in starting Spark Streaming Context

2015-10-22 Thread Tiago Albineli Motta
Can't say  what is happening, and I have a similar problem here.

While for you the source is:

org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
initialized


For me is:

org.apache.spark.SparkException:
org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has
not been initialized


Here, the problem started after I change my main class to use another
class to execute the stream.


Before:


object TopStream {

 //everything here

}


After


object TopStream {

   // call new TopStream.process( ... )

}


class TopStream extends Serializable {

}





Tiago Albineli Motta
Desenvolvedor de Software - Globo.com
ICQ: 32107100
http://programandosemcafeina.blogspot.com

On Wed, Jul 29, 2015 at 12:59 PM, Sadaf  wrote:

> Hi
>
> I am new to Spark Streaming and writing a code for twitter connector. when
> i
> run this code more than one time, it gives the following exception. I have
> to create a new hdfs directory for checkpointing each time to make it run
> successfully and moreover it doesn't get stopped.
>
> ERROR StreamingContext: Error starting the context, marking it as stopped
> org.apache.spark.SparkException:
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
> at
> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
> at scala.Option.orElse(Option.scala:257)
> at
> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
> at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at
>
> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
> at
>
> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
> at
>
> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
> at
>
> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
> at
>
> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
> at
>
> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
> at twitter.streamingSpark$.twitterConnector(App.scala:38)
> at twitter.streamingSpark$.main(App.scala:26)
> at twitter.streamingSpark.main(App.scala)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at
>
> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
> at
> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
> at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:192)
> at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:111)
> at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
>
> The relavent code is
>
>  def twitterConnector() :Unit =
>  {
>  val atwitter=managingCredentials()
>
>val ssc=StreamingContext.getOrCreate("hdfsDirectory",()=> {
> managingContext() })
>fetchTweets(ssc, atwitter )
>
>ssc.start() // Start the computation
>ssc.awaitTermination()
>
>}
>
>def managingContext():StreamingContext =
>   {
>//making spark context
>val conf = new
> SparkConf().setMaster("local[*]").setAppName("twitterConnector")
>val ssc = new StreamingContext(conf, Seconds(1))
>val sqlContext = new 

Re: driver ClassNotFoundException when MySQL JDBC exceptions are thrown on executor

2015-10-22 Thread Xiao Li
A few months ago, I used the DB2 jdbc drivers. I hit a couple of issues
when using --driver-class-path. At the end, I used the following command to
bypass most of issues:

./bin/spark-submit --jars
/Users/smile/db2driver/db2jcc.jar,/Users/smile/db2driver/db2jcc_license_cisuz.jar
--master local[*] --class com.sparkEngine.
/Users/smile/spark-1.3.1-bin-hadoop2.3/projects/SparkApps-master/spark-load-from-db/target/-1.0.jar

Hopefully, it works for you.

Xiao Li


2015-10-22 4:56 GMT-07:00 Akhil Das :

> Did you try passing the mysql connector jar through --driver-class-path
>
> Thanks
> Best Regards
>
> On Sat, Oct 17, 2015 at 6:33 AM, Hurshal Patel 
> wrote:
>
>> Hi all,
>>
>> I've been struggling with a particularly puzzling issue after upgrading
>> to Spark 1.5.1 from Spark 1.4.1.
>>
>> When I use the MySQL JDBC connector and an exception (e.g.
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException) is thrown on
>> the executor, I get a ClassNotFoundException on the driver, which results
>> in this error (logs are abbreviated):
>>
>> 15/10/16 17:20:59 INFO SparkContext: Starting job: collect at
>> repro.scala:73
>> ...
>> 15/10/16 17:20:59 INFO Executor: Running task 0.0 in stage 3.0 (TID 3)
>> 15/10/16 17:20:59 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID
>> 3)
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
>> at repro.Repro$$anonfun$main$3.apply$mcZI$sp(repro.scala:69)
>> ...
>> 15/10/16 17:20:59 WARN ThrowableSerializationWrapper: Task exception
>> could not be deserialized
>> java.lang.ClassNotFoundException:
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
>> at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>> ...
>> 15/10/16 17:20:59 ERROR TaskResultGetter: Could not deserialize
>> TaskEndReason: ClassNotFound with classloader
>> org.apache.spark.util.MutableURLClassLoader@7f08a6b1
>> 15/10/16 17:20:59 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 3,
>> localhost): UnknownReason
>> 15/10/16 17:20:59 ERROR TaskSetManager: Task 0 in stage 3.0 failed 1
>> times; aborting job
>> 15/10/16 17:20:59 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose
>> tasks have all completed, from pool
>> 15/10/16 17:20:59 INFO TaskSchedulerImpl: Cancelling stage 3
>> 15/10/16 17:20:59 INFO DAGScheduler: ResultStage 3 (collect at
>> repro.scala:73) failed in 0.012 s
>> 15/10/16 17:20:59 INFO DAGScheduler: Job 3 failed: collect at
>> repro.scala:73, took 0.018694 s
>>
>>  In Spark 1.4.1, I get the following (logs are abbreviated):
>> 15/10/16 17:42:41 INFO SparkContext: Starting job: collect at
>> repro.scala:53
>> ...
>> 15/10/16 17:42:41 INFO Executor: Running task 0.0 in stage 2.0 (TID 2)
>> 15/10/16 17:42:41 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID
>> 2)
>> com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
>> at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
>> ...
>> 15/10/16 17:42:41 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2,
>> localhost): com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException
>> at repro.Repro$$anonfun$main$2.apply$mcZI$sp(repro.scala:49)
>> ...
>>
>> 15/10/16 17:42:41 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1
>> times; aborting job
>> 15/10/16 17:42:41 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose
>> tasks have all completed, from pool
>> 15/10/16 17:42:41 INFO TaskSchedulerImpl: Cancelling stage 2
>> 15/10/16 17:42:41 INFO DAGScheduler: ResultStage 2 (collect at
>> repro.scala:53) failed in 0.016 s
>> 15/10/16 17:42:41 INFO DAGScheduler: Job 2 failed: collect at
>> repro.scala:53, took 0.024584 s
>>
>>
>> I have seriously screwed up somewhere or this is a change in behavior
>> that I have not been able to find in the documentation. For those that are
>> interested, a full repro and logs follow.
>>
>> Hurshal
>>
>> ---
>>
>> I am running this on Spark 1.5.1+Hadoop 2.6. I have tried this in various
>> combinations of
>>  * local/standalone mode
>>  * putting mysql on the classpath with --jars/building a fat jar with
>> mysql in it/manually running sc.addJar on the mysql jar
>>  * --deploy-mode client/--deploy-mode cluster
>> but nothing seems to change.
>>
>>
>>
>> Here is an example invocation, and the accompanying source code:
>>
>> $ ./bin/spark-submit --master local --deploy-mode client --class
>> repro.Repro /home/nix/repro/target/scala-2.10/repro-assembly-0.0.1.jar
>> Using Spark's default log4j profile:
>> org/apache/spark/log4j-defaults.properties
>> 15/10/16 17:40:53 INFO SparkContext: Running Spark version 1.5.1
>> 15/10/16 17:40:53 WARN NativeCodeLoader: Unable to load native-hadoop
>> library for your platform... using builtin-java classes where applicable
>> 15/10/16 17:40:53 WARN Utils: Your hostname, choochootrain resolves to a
>> loopback address: 127.0.1.1; using 10.0.1.97 instead (on interface wlan0)
>> 15/10/16 17:40:53 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to
>> another address
>> 15/10/16 

Spark 1.5.1+Hadoop2.6 .. unable to write to S3 (HADOOP-12420)

2015-10-22 Thread Ashish Shrowty
I understand that there is some incompatibility with the API between Hadoop
2.6/2.7 and Amazon AWS SDK where they changed a signature of 
com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold.
The JIRA indicates that this would be fixed in Hadoop 2.8.
(https://issues.apache.org/jira/browse/HADOOP-12420)

My question is - what are people doing today to access S3? I am unable to
find an older JAR of the AWS SDK to test with.

Thanks,
Ashish



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-Hadoop2-6-unable-to-write-to-S3-HADOOP-12420-tp25163.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Saving RDDs in Tachyon

2015-10-22 Thread mark
I have Avro records stored in Parquet files in HDFS. I want to read these
out as an RDD and save that RDD in Tachyon for any spark job that wants the
data.

How do I save the RDD in Tachyon? What format do I use? Which RDD
'saveAs...' method do I want?

Thanks


Re: Save to paquet files failed

2015-10-22 Thread Raghavendra Pandey
Can ypu increase number of partitions and try... Also, i dont think you
need to cache dfs before saving them... U can do away with that as well...

Raghav
On Oct 23, 2015 7:45 AM, "Ram VISWANADHA" 
wrote:

> Hi ,
> I am trying to load 931MB file into an RDD, then create a DataFrame and
> store the data in a Parquet file. The save method of Parquet file is
> hanging. I have set the timeout to 1800 but still the system fails to
> respond and hangs. I can’t spot any errors in my code. Can someone help me?
> Thanks in advance.
>
> Environment
>
>1. OS X 10.10.5 with 8G RAM
>2. JDK 1.8.0_60
>
> Code
>
> final SQLContext sqlContext = new SQLContext(jsc);
> //convert user viewing history to ratings (hash user_id to int)
> JavaRDD ratingJavaRDD = createMappedRatingsRDD(jsc);
> //for testing with 2d_full.txt data
> //JavaRDD ratingJavaRDD = createMappedRatingRDDFromFile(jsc);
> JavaRDD ratingRowsRDD = ratingJavaRDD.map(new GenericRowFromRating());
> ratingRowsRDD.cache();
>
> //This line saves the files correctly
>
> ratingJavaRDD.saveAsTextFile("file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd");
>
> final DataFrame ratingDF = sqlContext.createDataFrame(ratingRowsRDD,
> getStructTypeForRating());
> ratingDF.registerTempTable("rating_db");
> ratingDF.show();
> ratingDF.cache();
>
> //this line hangs
>
> ratingDF.write().format("parquet").save(
> "file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet"
> );
>
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-*
>
> -rw-r--r--  1 r.viswanadha  staff   785K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-0
>
> -rw-r--r--  1 r.viswanadha  staff   790K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-1
>
> -rw-r--r--  1 r.viswanadha  staff   786K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-2
>
> -rw-r--r--  1 r.viswanadha  staff   796K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-3
>
> -rw-r--r--  1 r.viswanadha  staff   791K Oct 22 18:55
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-4
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/
>
> The only thing that is saved is the temporary part file
>
> wks-195:rec-spark-java-poc r.viswanadha$ ls -lah
> /Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/task_201510221857_0007_m_00/
>
> total 336
>
> drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 .
>
> drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 ..
>
> -rw-r--r--  1 r.viswanadha  staff   1.3K Oct 22 18:57
> .part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet.crc
>
> -rw-r--r--  1 r.viswanadha  staff   163K Oct 22 18:57
> part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet
>
>
> Active Stages (1) Stage Id Description Submitted Duration Tasks:
> Succeeded/Total Input Output Shuffle Read Shuffle Write 7 (kill)
> save at
> Recommender.java:549 
> +details
> 
>
> 2015/10/22 18:57:15 17 min
> 1/5
> 9.4 MB
> Best Regards,
> Ram
>
>


How to restart a failed Spark Streaming Application automatically in client mode on YARN

2015-10-22 Thread y
I'm managing Spark Streaming applications which run on Cloud Dataproc
(https://cloud.google.com/dataproc/). Spark Streaming applications running
on a Cloud Dataproc cluster seem to run in client mode on YARN.

Some of my applications sometimes stop due to the application failure.

I'd like YARN to restart the stopped Spark Streaming applications
automatically but I don't how to achieve it. I tried to set a value to
"yarn.resourcemanager.am.max-attempts" and restarted YARN, but it didn't
restart the failed application automatically.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-restart-a-failed-Spark-Streaming-Application-automatically-in-client-mode-on-YARN-tp25172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to restart a failed Spark Streaming Application automatically in client mode on YARN

2015-10-22 Thread Saisai Shao
Looks like currently there's no way for Spark Streaming to restart
automatically in yarn-client mode, because in yarn-client mode, AM and
driver are two processes, Yarn only control the restart of AM, not driver,
so it is not supported in yarn-client mode.

You can write some scripts to monitor your driver process, if it is gone
just restart it.

Thanks
Saisai


On Fri, Oct 23, 2015 at 1:04 PM, y  wrote:

> I'm managing Spark Streaming applications which run on Cloud Dataproc
> (https://cloud.google.com/dataproc/). Spark Streaming applications running
> on a Cloud Dataproc cluster seem to run in client mode on YARN.
>
> Some of my applications sometimes stop due to the application failure.
>
> I'd like YARN to restart the stopped Spark Streaming applications
> automatically but I don't how to achieve it. I tried to set a value to
> "yarn.resourcemanager.am.max-attempts" and restarted YARN, but it didn't
> restart the failed application automatically.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-restart-a-failed-Spark-Streaming-Application-automatically-in-client-mode-on-YARN-tp25172.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Save to paquet files failed

2015-10-22 Thread Ram VISWANADHA
Hi ,
I am trying to load 931MB file into an RDD, then create a DataFrame and store 
the data in a Parquet file. The save method of Parquet file is hanging. I have 
set the timeout to 1800 but still the system fails to respond and hangs. I 
can’t spot any errors in my code. Can someone help me? Thanks in advance.

Environment

  1.  OS X 10.10.5 with 8G RAM
  2.  JDK 1.8.0_60

Code


final SQLContext sqlContext = new SQLContext(jsc);
//convert user viewing history to ratings (hash user_id to int)
JavaRDD ratingJavaRDD = createMappedRatingsRDD(jsc);
//for testing with 2d_full.txt data
//JavaRDD ratingJavaRDD = createMappedRatingRDDFromFile(jsc);
JavaRDD ratingRowsRDD = ratingJavaRDD.map(new GenericRowFromRating());
ratingRowsRDD.cache();

//This line saves the files correctly

ratingJavaRDD.saveAsTextFile("file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd");

final DataFrame ratingDF = sqlContext.createDataFrame(ratingRowsRDD, 
getStructTypeForRating());
ratingDF.registerTempTable("rating_db");
ratingDF.show();
ratingDF.cache();

//this line hangs

ratingDF.write().format("parquet").save("file:///Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet");


wks-195:rec-spark-java-poc r.viswanadha$ ls -lah 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-*

-rw-r--r--  1 r.viswanadha  staff   785K Oct 22 18:55 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-0

-rw-r--r--  1 r.viswanadha  staff   790K Oct 22 18:55 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-1

-rw-r--r--  1 r.viswanadha  staff   786K Oct 22 18:55 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-2

-rw-r--r--  1 r.viswanadha  staff   796K Oct 22 18:55 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-3

-rw-r--r--  1 r.viswanadha  staff   791K Oct 22 18:55 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings_rdd/part-4

wks-195:rec-spark-java-poc r.viswanadha$ ls -lah 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/

The only thing that is saved is the temporary part file

wks-195:rec-spark-java-poc r.viswanadha$ ls -lah 
/Users/r.viswanadha/Documents/workspace/rec-spark-java-poc/output/ratings.parquet/_temporary/0/task_201510221857_0007_m_00/

total 336

drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 .

drwxr-xr-x  4 r.viswanadha  staff   136B Oct 22 18:57 ..

-rw-r--r--  1 r.viswanadha  staff   1.3K Oct 22 18:57 
.part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet.crc

-rw-r--r--  1 r.viswanadha  staff   163K Oct 22 18:57 
part-r-0-65562f67-357c-4645-8075-13b733a71ee5.gz.parquet


Active Stages (1)
Stage IdDescription Submitted   DurationTasks: 
Succeeded/Total  Input   Output  Shuffle ReadShuffle Write
7   
(kill)save at 
Recommender.java:549+details

2015/10/22 18:57:15 17 min

1/5
9.4 MB
Best Regards,
Ram



Re: Spark 1.5 on CDH 5.4.0

2015-10-22 Thread Sandy Ryza
Hi Deenar,

The version of Spark you have may not be compiled with YARN support.  If
you inspect the contents of the assembly jar, does
org.apache.spark.deploy.yarn.ExecutorLauncher exist?  If not, you'll need
to find a version that does have the YARN classes.  You can also build your
own using the -Pyarn flag.

-Sandy

On Thu, Oct 22, 2015 at 9:04 AM, Deenar Toraskar 
wrote:

> Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
> http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
> working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
> well including connecting to the Hive metastore. I am facing an issue
> running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
> start as java cannot find ExecutorLauncher. Error: Could not find or load
> main class org.apache.spark.deploy.yarn.ExecutorLauncher client token: 
> N/Adiagnostics:
> Application application_1443531450011_13437 failed 2 times due to AM
> Container for appattempt_1443531450011_13437_02 exited with exitCode: 
> 1Stack
> trace: ExitCodeException exitCode=1:at
> org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
> org.apache.hadoop.util.Shell.run(Shell.java:455)at
> org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
> org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
> org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
> java.util.concurrent.FutureTask.run(FutureTask.java:262)at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
> java.lang.Thread.run(Thread.java:745) Any ideas as to what might be going
> wrong. Also how can I turn on more detailed logging to see what command
> line is being run by Yarn to launch containers? RegardsDeenar
>


Saving offset while reading from kafka

2015-10-22 Thread Ramkumar V
Hi,

I had written spark streaming application using kafka stream and its
writing to hdfs for every hour(batch time). I would like to know how to get
offset or commit offset of kafka stream while writing to hdfs so that if
there is any issue or redeployment, i'll start from the point where i did a
previous successful commit offset. I want to store offset in external db or
something like that, not in zookeeper. if i want to resume kafka stream
from the particular offset, how to resume from the particular offset in
spark ?

*Thanks*,



(SOLVED) Ahhhh... Spark creates >30000 partitions... What can I do?

2015-10-22 Thread t3l
I was able to solve this by myself. What I did is changing the way spark
computes the partitioning for binary files.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/A-Spark-creates-3-partitions-What-can-I-do-tp25140p25170.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Whether Spark will use disk when the memory is not enough on MEMORY_ONLY Storage Level

2015-10-22 Thread JoneZhang
1.Whether Spark will use disk when the memory is not enough on MEMORY_ONLY
Storage Level?
2.If not, How can i set Storage Level when i use Hive on Spark?
3.Do Spark have any intention of  dynamically determined Hive on MapReduce
or Hive on Spark, base on SQL features. 

Thanks in advance
Best regards



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Whether-Spark-will-use-disk-when-the-memory-is-not-enough-on-MEMORY-ONLY-Storage-Level-tp25171.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Can we split partition

2015-10-22 Thread Akhil Das
Did you try coalesce? It doesn't shuffle the data around.

Thanks
Best Regards

On Wed, Oct 21, 2015 at 10:27 AM, shahid  wrote:

> Hi
>
> I have a large partition(data skewed) i need to split it to no. of
> partitions, repartitioning causes lot of shuffle. Can we do that..?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Can-we-split-partition-tp25151.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Analyzing consecutive elements

2015-10-22 Thread Adrian Tanase
Drop is a method on scala’s collections (array, list, etc) - not on Spark’s 
RDDs. You can look at it as long as you use mapPartitions or something like 
reduceByKey, but it totally depends on the use-cases you have for analytics.

The others have suggested better solutions using only spark’s APIs.

-adrian

From: Sampo Niskanen
Date: Thursday, October 22, 2015 at 2:12 PM
To: Adrian Tanase
Cc: user
Subject: Re: Analyzing consecutive elements

Hi,

Sorry, I'm not very familiar with those methods and cannot find the 'drop' 
method anywhere.

As an example:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)
// ... then what?


Thanks.


Best regards,

Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com
+358 40 820 5291


On Thu, Oct 22, 2015 at 10:43 AM, Adrian Tanase 
> wrote:
I'm not sure if there is a better way to do it directly using Spark APIs but I 
would try to use mapPartitions and then within each partition Iterable to:

rdd.zip(rdd.drop(1)) - using the Scala collection APIs

This should give you what you need inside a partition. I'm hoping that you can 
partition your data somehow (e.g by user id or session id) that makes you 
algorithm parallel. In that case you can use the snippet above in a reduceByKey.

hope this helps
-adrian

Sent from my iPhone

On 22 Oct 2015, at 09:36, Sampo Niskanen 
> wrote:

Hi,

I have analytics data with timestamps on each element.  I'd like to analyze 
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E] to 
an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze 
time-related elements.)

How can this be achieved?


Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com
+358 40 820 5291




Re: How to distinguish columns when joining DataFrames with shared parent?

2015-10-22 Thread Xiao Li
Actually, I found a design issue in self joins. When we have multiple-layer
projections above alias, the information of alias relation between alias
and actual columns are lost. Thus, when resolving the alias in self joins,
the rules treat the alias (e.g., in Projection) as normal columns. This
only happens when using dataFrames. When using sql, the duplicate names
after self join will stop another self join.

We need a mechanism to trace back the original/actual column for each
alias, like what RDBMS optimizers are doing. The most efficient way is to
directly store the alias-information in the node to indicate if this is
from alias; otherwise, we need to traverse the underlying tree for each
column to confirm it is not from alias even if it is not from an alias

Good luck,

Xiao Li

2015-10-21 16:33 GMT-07:00 Isabelle Phan :

> Ok, got it.
> Thanks a lot Michael for the detailed reply!
> On Oct 21, 2015 1:54 PM, "Michael Armbrust" 
> wrote:
>
>> Yeah, I was suggesting that you avoid using  
>> org.apache.spark.sql.DataFrame.apply(colName:
>> String) when you are working with selfjoins as it eagerly binds to a
>> specific column in a what that breaks when we do the rewrite of one side of
>> the query.  Using the apply method constructs a resolved column eagerly
>> (which looses the alias information).
>>
>> On Wed, Oct 21, 2015 at 1:49 PM, Isabelle Phan  wrote:
>>
>>> Thanks Michael and Ali for the reply!
>>>
>>> I'll make sure to use unresolved columns when working with self joins
>>> then.
>>>
>>> As pointed by Ali, isn't there still an issue with the aliasing? It
>>> works when using org.apache.spark.sql.functions.col(colName: String)
>>> method, but not when using org.apache.spark.sql.DataFrame.apply(colName:
>>> String):
>>>
>>> scala> j.select(col("lv.value")).show
>>> +-+
>>> |value|
>>> +-+
>>> |   10|
>>> |   20|
>>> +-+
>>>
>>>
>>> scala> j.select(largeValues("lv.value")).show
>>> +-+
>>> |value|
>>> +-+
>>> |1|
>>> |5|
>>> +-+
>>>
>>> Or does this behavior have the same root cause as detailed in Michael's
>>> email?
>>>
>>>
>>> -Isabelle
>>>
>>>
>>>
>>>
>>> On Wed, Oct 21, 2015 at 11:46 AM, Michael Armbrust <
>>> mich...@databricks.com> wrote:
>>>
 Unfortunately, the mechanisms that we use to differentiate columns
 automatically don't work particularly well in the presence of self joins.
 However, you can get it work if you use the $"column" syntax
 consistently:

 val df = Seq((1, 1), (1, 10), (2, 3), (3, 20), (3, 5), (4, 
 10)).toDF("key", "value")val smallValues = df.filter('value < 
 10).as("sv")val largeValues = df.filter('value >= 10).as("lv")
 ​
 smallValues
   .join(largeValues, $"sv.key" === $"lv.key")
   .select($"sv.key".as("key"), $"sv.value".as("small_value"), 
 $"lv.value".as("large_value"))
   .withColumn("diff", $"small_value" - $"large_value")
   .show()
 +---+---+---++|key|small_value|large_value|diff|+---+---+---++|
   1|  1| 10|  -9||  3|  5| 20| 
 -15|+---+---+---++


 The problem with the other cases is that calling
 smallValues("columnName") or largeValues("columnName") is eagerly
 resolving the attribute to the same column (since the data is actually
 coming from the same place).  By the time we realize that you are joining
 the data with itself (at which point we rewrite one side of the join to use
 different expression ids) its too late.  At the core the problem is that in
 Scala we have no easy way to differentiate largeValues("columnName")
 from smallValues("columnName").  This is because the data is coming
 from the same DataFrame and we don't actually know which variable name you
 are using.  There are things we can change here, but its pretty hard to
 change the semantics without breaking other use cases.

 So, this isn't a straight forward "bug", but its definitely a usability
 issue.  For now, my advice would be: only use unresolved columns (i.e.
 $"[alias.]column" or col("[alias.]column")) when working with self
 joins.

 Michael

>>>
>>>
>>


Save RandomForest Model from ML package

2015-10-22 Thread Sebastian Kuepers
Hey,

I try to figure out the best practice on saving and loading models which have 
bin fitted with the ML package - i.e. with the RandomForest classifier.

There is PMML support in the MLib package afaik but not in ML - is that correct?

How do you approach this, so that you do not have to fit your model before 
every prediction job?

Thanks,
Sebastian


Sebastian Küpers
Account Director

Publicis Pixelpark
Leibnizstrasse 65, 10629 Berlin
T +49 30 5058 1838
M +49 172 389 28 52
sebastian.kuep...@publicispixelpark.de
Web: publicispixelpark.de, Twitter: @pubpxp
Facebook: publicispixelpark.de/facebook
Publicis Pixelpark - eine Marke der Pixelpark AG
Vorstand: Horst Wagner (Vorsitzender), Dirk Kedrowitsch
Aufsichtsratsvorsitzender: Pedro Simko
Amtsgericht Charlottenburg: HRB 72163






Disclaimer The information in this email and any attachments may contain 
proprietary and confidential information that is intended for the addressee(s) 
only. If you are not the intended recipient, you are hereby notified that any 
disclosure, copying, distribution, retention or use of the contents of this 
information is prohibited. When addressed to our clients or vendors, any 
information contained in this e-mail or any attachments is subject to the terms 
and conditions in any governing contract. If you have received this e-mail in 
error, please immediately contact the sender and delete the e-mail.


Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka
direct stream.

I have been using the receiver based approach for months but the
direct stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing
delay. And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these exceptions.

I would be very appreciative for any direction and I can happily
provide more detail.

Thanks,
Conor

15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
found, computing it
15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
partition 0 offsets 13630747 -> 13633001
15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
is overridden to
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to
15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
stage 654.0 (TID 5242)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 5243
15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
654.0 (TID 5243)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Sporadic error after moving from kafka receiver to kafka direct stream

2015-10-22 Thread Conor Fennell
Hi,

Firstly want to say a big thanks to Cody for contributing the kafka
direct stream.

I have been using the receiver based approach for months but the
direct stream is a much better solution for my use case.

The job in question is now ported over to the direct stream doing
idempotent outputs to Cassandra and outputting to kafka.
I am also saving the offsets to Cassandra.

But unfortunately I am sporadically getting the error below.
It recovers and continues but gives a large spike in the processing
delay. And it can happen in every 3 or 4 batches.
I still have other receiver jobs running and they never throw these exceptions.

I would be very appreciative for any direction and I can happily
provide more detail.

Thanks,
Conor

15/10/22 13:30:00 INFO spark.CacheManager: Partition rdd_1528_0 not
found, computing it
15/10/22 13:30:00 INFO kafka.KafkaRDD: Computing topic events,
partition 0 offsets 13630747 -> 13633001
15/10/22 13:30:00 INFO utils.VerifiableProperties: Verifying properties
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property group.id
is overridden to
15/10/22 13:30:00 INFO utils.VerifiableProperties: Property
zookeeper.connect is overridden to
15/10/22 13:30:30 INFO consumer.SimpleConsumer: Reconnect due to
socket error: java.nio.channels.ClosedChannelException
15/10/22 13:31:00 ERROR executor.Executor: Exception in task 0.0 in
stage 654.0 (TID 5242)
java.nio.channels.ClosedChannelException
at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
at kafka.consumer.SimpleConsumer.liftedTree1$1(SimpleConsumer.scala:78)
at 
kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(SimpleConsumer.scala:68)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1$$anonfun$apply$mcV$sp$1.apply(SimpleConsumer.scala:112)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply$mcV$sp(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at 
kafka.consumer.SimpleConsumer$$anonfun$fetch$1.apply(SimpleConsumer.scala:111)
at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
at kafka.consumer.SimpleConsumer.fetch(SimpleConsumer.scala:110)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.fetchBatch(KafkaRDD.scala:192)
at 
org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.getNext(KafkaRDD.scala:208)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:277)
at org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:262)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
15/10/22 13:31:00 INFO executor.CoarseGrainedExecutorBackend: Got
assigned task 5243
15/10/22 13:31:00 INFO executor.Executor: Running task 1.0 in stage
654.0 (TID 5243)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Analyzing consecutive elements

2015-10-22 Thread Andrianasolo Fanilo
Hi Sampo,


There is a sliding method you could try inside the 
org.apache.spark.mllib.rdd.RDDFunctions package, though it’s DeveloperApi stuff 
(https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions)



import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.rdd.RDDFunctions._

object Test {

  def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf()
sparkConf.setMaster("local").setAppName("sandbox")

val sc = new SparkContext(sparkConf)

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

print(sorted.sliding(2).map(x => (x(0), x(1))).collect().toSeq)

sc.stop()
  }
}



prints



WrappedArray(((1,A),(3,B)), ((3,B),(7,C)), ((7,C),(8,D)), ((8,D),(9,E)))



Otherwise you could try to convert your RDD to a DataFrame then use windowing 
functions in SparkSQL with the LEAD/LAG functions.



Best regards,

Fanilo



De : Dylan Hogg [mailto:dylanh...@gmail.com]
Envoyé : jeudi 22 octobre 2015 13:44
À : Sampo Niskanen
Cc : user
Objet : Re: Analyzing consecutive elements

Hi Sampo,
You could try zipWithIndex followed by a self join with shifted index values 
like this:

val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
val rdd = sc.parallelize(arr)
val sorted = rdd.sortByKey(true)

val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)
Which produces the consecutive elements as pairs in the RDD for further 
processing:
(0,((1,A),(3,B)))
(1,((3,B),(7,C)))
(2,((7,C),(8,D)))
(3,((8,D),(9,E)))
There are probably more efficient ways to do this, but if your dataset isn't 
too big it should work for you.
Cheers,
Dylan.


On 22 October 2015 at 17:35, Sampo Niskanen 
> wrote:
Hi,

I have analytics data with timestamps on each element.  I'd like to analyze 
consecutive elements using Spark, but haven't figured out how to do this.

Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E] to 
an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze 
time-related elements.)

How can this be achieved?


Sampo Niskanen
Lead developer / Wellmo

sampo.niska...@wellmo.com
+358 40 820 5291





Ce message et les pièces jointes sont confidentiels et réservés à l'usage 
exclusif de ses destinataires. Il peut également être protégé par le secret 
professionnel. Si vous recevez ce message par erreur, merci d'en avertir 
immédiatement l'expéditeur et de le détruire. L'intégrité du message ne pouvant 
être assurée sur Internet, la responsabilité de Worldline ne pourra être 
recherchée quant au contenu de ce message. Bien que les meilleurs efforts 
soient faits pour maintenir cette transmission exempte de tout virus, 
l'expéditeur ne donne aucune garantie à cet égard et sa responsabilité ne 
saurait être recherchée pour tout dommage résultant d'un virus transmis.

This e-mail and the documents attached are confidential and intended solely for 
the addressee; it may also be privileged. If you receive this e-mail in error, 
please notify the sender immediately and destroy it. As its integrity cannot be 
secured on the Internet, the Worldline liability cannot be triggered for the 
message content. Although the sender endeavours to maintain a computer 
virus-free network, the sender does not warrant that this transmission is 
virus-free and will not be liable for any damages resulting from any virus 
transmitted.


Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Saif.A.Ellafi
Hello everyone,

I am doing some analytics experiments under a 4 server stand-alone cluster in a 
spark shell, mostly involving a huge database with groupBy and aggregations.

I am picking 6 groupBy columns and returning various aggregated results in a 
dataframe. GroupBy fields are of two types, most of them are StringType and the 
rest are LongType.

The data source is a splitted json file dataframe,  once the data is persisted, 
the result is consistent. But if I unload the memory and reload the data, the 
groupBy action returns different content results, missing data.

Could I be missing something? this is rather serious for my analytics, and not 
sure how to properly diagnose this situation.

Thanks,
Saif



[jira] Ankit shared "SPARK-11213: Documentation for remote spark Submit for R Scripts from 1.5 on CDH 5.4" with you

2015-10-22 Thread Ankit (JIRA)
Ankit shared an issue with you
---



> Documentation for remote spark Submit for R Scripts from 1.5 on CDH 5.4
> ---
>
> Key: SPARK-11213
> URL: https://issues.apache.org/jira/browse/SPARK-11213
> Project: Spark
>  Issue Type: Bug
>Reporter: Ankit
>
> Hello Guys,
> We have a Cloudera Dist 5.4 ad it has spark 1.3 version 
> Issue 
> we have data sciencetis work on R Script so was searching a ways to submit a 
> r script using ozie or local spark submit to a remoter Yarn resource manager 
> can anyone share the steps to do the same it really difficult to guess the 
> steps , 
> Thanks in advance 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Xiao Li
Hi, Saif,

Could you post your code here? It might help others reproduce the errors
and give you a correct answer.

Thanks,

Xiao Li

2015-10-22 8:27 GMT-07:00 :

> Hello everyone,
>
> I am doing some analytics experiments under a 4 server stand-alone cluster
> in a spark shell, mostly involving a huge database with groupBy and
> aggregations.
>
> I am picking 6 groupBy columns and returning various aggregated results in
> a dataframe. GroupBy fields are of two types, most of them are StringType
> and the rest are LongType.
>
> The data source is a splitted json file dataframe,  once the data is
> persisted, the result is consistent. But if I unload the memory and reload
> the data, the groupBy action returns different content results, missing
> data.
>
> Could I be missing something? this is rather serious for my analytics, and
> not sure how to properly diagnose this situation.
>
> Thanks,
> Saif
>
>


Re: Analyzing consecutive elements

2015-10-22 Thread Sampo Niskanen
Hi,

Excellent, the sliding method seems to be just what I'm looking for.  Hope
it becomes part of the stable API, I'd assume there to be lots of uses with
time-related data.

Dylan's suggestion seems reasonable as well, if DeveloperApi is not an
option.

Thanks!


Best regards,

*Sampo Niskanen*

*Lead developer / Wellmo*
sampo.niska...@wellmo.com
+358 40 820 5291


On Thu, Oct 22, 2015 at 3:51 PM, Andrianasolo Fanilo <
fanilo.andrianas...@worldline.com> wrote:

> Hi Sampo,
>
>
>
> There is a sliding method you could try inside the 
> org.apache.spark.mllib.rdd.RDDFunctions package, though it’s DeveloperApi 
> stuff 
> (https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.mllib.rdd.RDDFunctions)
>
>
>
> *import *org.apache.spark.{SparkConf, SparkContext}
> *import *org.apache.spark.mllib.rdd.RDDFunctions._
>
> *object *Test {
>
>   *def *main(args: Array[String]): Unit = {
> *val *sparkConf = *new *SparkConf()
> sparkConf.setMaster(*"local"*).setAppName(*"sandbox"*)
>
> *val *sc = *new *SparkContext(sparkConf)
>
> *val *arr = *Array*((1, *"A"*), (8, *"D"*), (7, *"C"*), (3, *"B"*), (9, 
> *"E"*))
> *val *rdd = sc.parallelize(arr)
> *val *sorted = rdd.sortByKey(*true*)
>
> print(sorted.sliding(2).map(x => (x(0), x(1))).collect().toSeq)
>
>
> sc.stop()
>   }
> }
>
>
>
> prints
>
>
>
> WrappedArray(((1,A),(3,B)), ((3,B),(7,C)), ((7,C),(8,D)), ((8,D),(9,E)))
>
>
>
> Otherwise you could try to convert your RDD to a DataFrame then use windowing 
> functions in SparkSQL with the LEAD/LAG functions.
>
>
>
> Best regards,
>
> Fanilo
>
>
>
>
>
> *De :* Dylan Hogg [mailto:dylanh...@gmail.com]
> *Envoyé :* jeudi 22 octobre 2015 13:44
> *À :* Sampo Niskanen
> *Cc :* user
> *Objet :* Re: Analyzing consecutive elements
>
>
>
> Hi Sampo,
>
> You could try zipWithIndex followed by a self join with shifted index
> values like this:
>
> val arr = Array((1, "A"), (8, "D"), (7, "C"), (3, "B"), (9, "E"))
> val rdd = sc.parallelize(arr)
> val sorted = rdd.sortByKey(true)
>
> val zipped = sorted.zipWithIndex.map(x => (x._2, x._1))
> val pairs = zipped.join(zipped.map(x => (x._1 - 1, x._2))).sortBy(_._1)
>
> Which produces the consecutive elements as pairs in the RDD for further
> processing:
> (0,((1,A),(3,B)))
> (1,((3,B),(7,C)))
> (2,((7,C),(8,D)))
> (3,((8,D),(9,E)))
>
> There are probably more efficient ways to do this, but if your dataset
> isn't too big it should work for you.
>
> Cheers,
>
> Dylan.
>
>
>
>
>
> On 22 October 2015 at 17:35, Sampo Niskanen 
> wrote:
>
> Hi,
>
>
>
> I have analytics data with timestamps on each element.  I'd like to
> analyze consecutive elements using Spark, but haven't figured out how to do
> this.
>
>
>
> Essentially what I'd want is a transform from a sorted RDD [A, B, C, D, E]
> to an RDD [(A,B), (B,C), (C,D), (D,E)].  (Or some other way to analyze
> time-related elements.)
>
>
>
> How can this be achieved?
>
>
>
>
> *Sampo Niskanen*
> *Lead developer / Wellmo*
>
> sampo.niska...@wellmo.com
> +358 40 820 5291
>
>
>
>
> --
>
> Ce message et les pièces jointes sont confidentiels et réservés à l'usage
> exclusif de ses destinataires. Il peut également être protégé par le secret
> professionnel. Si vous recevez ce message par erreur, merci d'en avertir
> immédiatement l'expéditeur et de le détruire. L'intégrité du message ne
> pouvant être assurée sur Internet, la responsabilité de Worldline ne pourra
> être recherchée quant au contenu de ce message. Bien que les meilleurs
> efforts soient faits pour maintenir cette transmission exempte de tout
> virus, l'expéditeur ne donne aucune garantie à cet égard et sa
> responsabilité ne saurait être recherchée pour tout dommage résultant d'un
> virus transmis.
>
> This e-mail and the documents attached are confidential and intended
> solely for the addressee; it may also be privileged. If you receive this
> e-mail in error, please notify the sender immediately and destroy it. As
> its integrity cannot be secured on the Internet, the Worldline liability
> cannot be triggered for the message content. Although the sender endeavours
> to maintain a computer virus-free network, the sender does not warrant that
> this transmission is virus-free and will not be liable for any damages
> resulting from any virus transmitted.
>


Re: [jira] Ankit shared "SPARK-11213: Documentation for remote spark Submit for R Scripts from 1.5 on CDH 5.4" with you

2015-10-22 Thread Anubhav Agarwal
Hi Ankit,
Here is my solution for this:-

1) Download the latest Spark 1.5.1(Just copied the following link from
spark.apache.org, if it doesn't work then gran a new one from the website.)
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.5.1-bin-hadoop2.6.tgz

2) Unzip the folder and rename/move the folder according to your wish.

3) To run the Spark job now, first we need to setup HADOOP_CONF_DIR ,
YARN_CONF_DIR and SPARK_HOME.

The following is my bash script to run the jar.
export HADOOP_CONF_DIR=/etc/hadoop/conf
export YARN_CONF_DIR=/etc/spark/conf.cloudera.spark_on_yarn/yarn-conf
echo $YARN_CONF_DIR
export SPARK_HOME=/hadoop/user/ooxpdeva/spark151
echo $SPARK_HOME
$SPARK_HOME/bin/spark-submit --verbose --class anubhav.Main --master
yarn-client --num-executors 7 --driver-memory 6g --executor-memory 6g
--executor-cores 8 --queue deva --conf
"spark.executor.extraJavaOptions=-Dlog4j.configuration/hadoop/user/deva/log4j.properties"
/hadoop/user/deva/anubhav.jar arg1 arg2


Hope this helps you.
Regards,
Anubhav


On Thu, Oct 22, 2015 at 11:39 AM, Ankit (JIRA)  wrote:

> Ankit shared an issue with you
> ---
>
>
>
> > Documentation for remote spark Submit for R Scripts from 1.5 on CDH 5.4
> > ---
> >
> > Key: SPARK-11213
> > URL: https://issues.apache.org/jira/browse/SPARK-11213
> > Project: Spark
> >  Issue Type: Bug
> >Reporter: Ankit
> >
> > Hello Guys,
> > We have a Cloudera Dist 5.4 ad it has spark 1.3 version
> > Issue
> > we have data sciencetis work on R Script so was searching a ways to
> submit a r script using ozie or local spark submit to a remoter Yarn
> resource manager can anyone share the steps to do the same it really
> difficult to guess the steps ,
> > Thanks in advance
>
>
>
>
> --
> This message was sent by Atlassian JIRA
> (v6.3.4#6332)
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[Spark Streaming] How do we reset the updateStateByKey values.

2015-10-22 Thread Uthayan Suthakar
Hello guys,

I have a stream job that will carryout computations and update the state
(SUM the value). At some point, I would like to reset the state. I could
drop the state by setting 'None' but I don't want to drop it. I would like
to keep the state but update the state.


For example:

JavaPairDStream updatedResultsState =
streamLogs.updateStateByKey(updateResultsStream);

At some condition, I would like to update the state by key but with the
different values, hence different update function.


e.g.

 updatedResultsState = newData.updateStateByKey(resetResultsStream);

But the  newData.updateStateByKeyvalues cannot be replaced with the value
in streamLogs.updateStateByKey. Do you know how I could replace the state
value in  streamLogs with newData.

Is this possible?


Re: Spark 1.5.1+Hadoop2.6 .. unable to write to S3 (HADOOP-12420)

2015-10-22 Thread Igor Berman
many use it.
how do you add aws sdk to classpath?
check in environment ui what is in cp.
you should make sure that in your cp the version is compatible with one
that spark compiled with
I think 1.7.4 is compatible(at least we use it)

make sure that you don't get other versions from other transitive
dependencies(effective pom)

On 22 October 2015 at 17:12, Ashish Shrowty 
wrote:

> I understand that there is some incompatibility with the API between Hadoop
> 2.6/2.7 and Amazon AWS SDK where they changed a signature of
>
> com.amazonaws.services.s3.transfer.TransferManagerConfiguration.setMultipartUploadThreshold.
> The JIRA indicates that this would be fixed in Hadoop 2.8.
> (https://issues.apache.org/jira/browse/HADOOP-12420)
>
> My question is - what are people doing today to access S3? I am unable to
> find an older JAR of the AWS SDK to test with.
>
> Thanks,
> Ashish
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-5-1-Hadoop2-6-unable-to-write-to-S3-HADOOP-12420-tp25163.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: spark streaming failing to replicate blocks

2015-10-22 Thread Eugen Cepoi
Huh indeed this worked, thanks. Do you know why this happens, is that some
known issue?

Thanks,
Eugen

2015-10-22 19:08 GMT+07:00 Akhil Das :

> Can you try fixing spark.blockManager.port to specific port and see if the
> issue exists?
>
> Thanks
> Best Regards
>
> On Mon, Oct 19, 2015 at 6:21 PM, Eugen Cepoi 
> wrote:
>
>> Hi,
>>
>> I am running spark streaming 1.4.1 on EMR (AMI 3.9) over YARN.
>> The job is reading data from Kinesis and the batch size is of 30s (I used
>> the same value for the kinesis checkpointing).
>> In the executor logs I can see every 5 seconds a sequence of stacktraces
>> indicating that the block replication failed. I am using the default
>> storage level MEMORY_AND_DISK_SER_2.
>> WAL is not enabled nor checkpointing (the checkpoint dir is configured
>> for the spark context but not for the streaming context).
>>
>> Here is an example of those logs for ip-10-63-160-18. They occur in every
>> executor while trying to replicate to any other executor.
>>
>>
>> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
>> [ip-10-63-160-18.ec2.internal/10.63.160.18:50929]
>> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection to 
>> ip-10-63-160-18.ec2.internal/10.63.160.18:50929
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
>> message.
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Notifying 
>> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Handling connection error on 
>> connection to ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>> 15/10/19 03:11:55 WARN storage.BlockManager: Failed to replicate 
>> input-1-144524231 to BlockManagerId(3, ip-10-159-151-22.ec2.internal, 
>> 50929), failure #0
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 INFO nio.ConnectionManager: Removing SendingConnection to 
>> ConnectionManagerId(ip-10-63-160-18.ec2.internal,50929)
>> 15/10/19 03:11:55 INFO nio.SendingConnection: Initiating connection to 
>> [ip-10-63-160-18.ec2.internal/10.63.160.18:39506]
>> 15/10/19 03:11:55 WARN nio.SendingConnection: Error finishing connection to 
>> ip-10-63-160-18.ec2.internal/10.63.160.18:39506
>> java.net.ConnectException: Connection refused
>>  at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>  at 
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739)
>>  at 
>> org.apache.spark.network.nio.SendingConnection.finishConnect(Connection.scala:344)
>>  at 
>> org.apache.spark.network.nio.ConnectionManager$$anon$10.run(ConnectionManager.scala:292)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>  at 
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>  at java.lang.Thread.run(Thread.java:745)
>> 15/10/19 03:11:55 ERROR nio.ConnectionManager: Exception while sending 
>> message.
>> java.net.ConnectException: Connection refused
>>  at 

Python worker exited unexpectedly (crashed)

2015-10-22 Thread shahid
Hi 

I am running 10 node standalone cluster on aws
and loading 100G data on HDFS.. doing first groupby operation.
and then generating pairs from the groupedrdd (key,[a1,b1],key,[a,b,c]) 
generating the pairs like
(a1,b1),(a,b),(a,c) ... n
PairRDD will get large in size.

some stats from ui when starting to get errors and finally script fails
Details for Stage 1 (Attempt 0)
Total Time Across All Tasks: 1.3 h
Shuffle Read: 4.4 GB / 1402058
Shuffle Spill (Memory): 73.1 GB
Shuffle Spill (Disk): 3.6 GB

Get following stack trace 

WARN scheduler.TaskSetManager: Lost task 0.3 in stage 1.0 (TID 943,
10.239.131.154): org.apache.spark.SparkException: Python worker exited
unexpectedly (crashed)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:175)
at
org.apache.spark.api.python.PythonRDD$$anon$1.(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
at java.io.DataInputStream.readInt(DataInputStream.java:392)
at 
org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:111)
... 10 more

15/10/22 16:30:17 ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed
4 times; aborting job
15/10/22 16:30:17 INFO scheduler.TaskSchedulerImpl: Cancelling stage 1



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Python-worker-exited-unexpectedly-crashed-tp25164.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Saif.A.Ellafi
Thanks, sorry I cannot share the data and not sure how much significant it will 
be for you.
I am reproducing the issue on a smaller piece of the content and see wether I 
find a reason on the inconsistence.

val res2 = data.filter($"closed" === $"ever_closed").groupBy("product", "band 
", "aget", "vine", "time", "mm").agg(count($"account_id").as("N"), 
sum($"balance").as("balance"), sum($"spend").as("spend"), 
sum($"payment").as("payment")).persist()

then I collect distinct values of “vine” (which is StringType) both from data 
and res2, and res2 is missing a lot of values:

val t1 = res2.select("vine").distinct.collect
scala> t1.size
res10: Int = 617

val t_real = data.select("vine").distinct.collect
scala> t_real.size
res9: Int = 639


From: Xiao Li [mailto:gatorsm...@gmail.com]
Sent: Thursday, October 22, 2015 12:45 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Spark groupby and agg inconsistent and missing data

Hi, Saif,

Could you post your code here? It might help others reproduce the errors and 
give you a correct answer.

Thanks,

Xiao Li

2015-10-22 8:27 GMT-07:00 
>:
Hello everyone,

I am doing some analytics experiments under a 4 server stand-alone cluster in a 
spark shell, mostly involving a huge database with groupBy and aggregations.

I am picking 6 groupBy columns and returning various aggregated results in a 
dataframe. GroupBy fields are of two types, most of them are StringType and the 
rest are LongType.

The data source is a splitted json file dataframe,  once the data is persisted, 
the result is consistent. But if I unload the memory and reload the data, the 
groupBy action returns different content results, missing data.

Could I be missing something? this is rather serious for my analytics, and not 
sure how to properly diagnose this situation.

Thanks,
Saif




Spark SQL: Issues with using DirectParquetOutputCommitter with APPEND mode and OVERWRITE mode

2015-10-22 Thread Jerry Lam
Hi Spark users and developers,

I read the ticket [SPARK-8578] (Should ignore user defined output committer
when appending data) which ignore DirectParquetOutputCommitter if append
mode is selected. The logic was that it is unsafe to use because it is not
possible to revert a failed job in append mode using
DirectParquetOutputCommitter. I think wouldn't it better to allow users to
use it at their own risk? Say, if you use DirectParquetOutputCommitter with
append mode, the job fails immediately when a task fails. The user can then
choose to reprocess the job entirely which is not a big deal since failure
is rare in most cases. Another approach is to provide at least once-task
semantic for append mode using DirectParquetOutputCommitter. This will end
up having duplicate records but for some applications, this is fine.

The second issue is that  the assumption that Overwrite mode works with
DirectParquetOutputCommitter for all cases is wrong at least from the
perspective of using it with s3. S3 provides eventual consistency for
overwrite PUTS and DELETES. So if you attempt to delete a directory and
then create the same directory again in a split of a second. The chance you
hit org.apache.hadoop.fs.FileAlreadyExistsException is very high because
deletes don't immediately and creating the same file before it is deleted
will result with the above exception. Might I propose to change the code
such that it will actually OVERWRITE the file instead of a delete following
by a create?

Best Regards,

Jerry


Spark 1.5 on CDH 5.4.0

2015-10-22 Thread Deenar Toraskar
Hi I have got the prebuilt version of Spark 1.5 for Hadoop 2.6 (
http://www.apache.org/dyn/closer.lua/spark/spark-1.5.1/spark-1.5.1-bin-hadoop2.6.tgz)
working with CDH 5.4.0 in local mode on a cluster with Kerberos. It works
well including connecting to the Hive metastore. I am facing an issue
running spark jobs in yarn-client/yarn-cluster mode. The executors fail to
start as java cannot find ExecutorLauncher. Error: Could not find or load
main class org.apache.spark.deploy.yarn.ExecutorLauncher client token:
N/Adiagnostics:
Application application_1443531450011_13437 failed 2 times due to AM
Container for appattempt_1443531450011_13437_02 exited with
exitCode: 1Stack
trace: ExitCodeException exitCode=1:at
org.apache.hadoop.util.Shell.runCommand(Shell.java:538)at
org.apache.hadoop.util.Shell.run(Shell.java:455)at
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)at
org.apache.hadoop.yarn.server.nodemanager.LinuxContainerExecutor.launchContainer(LinuxContainerExecutor.java:293)at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)at
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)at
java.util.concurrent.FutureTask.run(FutureTask.java:262)at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)at
java.lang.Thread.run(Thread.java:745) Any ideas as to what might be going
wrong. Also how can I turn on more detailed logging to see what command
line is being run by Yarn to launch containers? RegardsDeenar


Re: Error in starting Spark Streaming Context

2015-10-22 Thread Tiago Albineli Motta
Solved!

The problem has nothing to do about class and object refactory. But in the
process of this refactory I made a change that is similar of your code.

Before this refactory, I processed the DStream inside the function that I
sent to StreamingContext.getOrCreate. After, I started processing the
DStream using the returned from StreamingContext.getOrCreate returned.

So you should call *fetchTweets *inside *managingContext*.

That worked for me.

Tiago





Tiago Albineli Motta
Desenvolvedor de Software - Globo.com
ICQ: 32107100
http://programandosemcafeina.blogspot.com

On Thu, Oct 22, 2015 at 11:22 AM, Tiago Albineli Motta 
wrote:

> Can't say  what is happening, and I have a similar problem here.
>
> While for you the source is:
>
> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
> initialized
>
>
> For me is:
>
> org.apache.spark.SparkException: 
> org.apache.spark.streaming.dstream.MapPartitionedDStream@7a2d07cc has not 
> been initialized
>
>
> Here, the problem started after I change my main class to use another class 
> to execute the stream.
>
>
> Before:
>
>
> object TopStream {
>
>  //everything here
>
> }
>
>
> After
>
>
> object TopStream {
>
>// call new TopStream.process( ... )
>
> }
>
>
> class TopStream extends Serializable {
>
> }
>
>
>
>
>
> Tiago Albineli Motta
> Desenvolvedor de Software - Globo.com
> ICQ: 32107100
> http://programandosemcafeina.blogspot.com
>
> On Wed, Jul 29, 2015 at 12:59 PM, Sadaf  wrote:
>
>> Hi
>>
>> I am new to Spark Streaming and writing a code for twitter connector.
>> when i
>> run this code more than one time, it gives the following exception. I have
>> to create a new hdfs directory for checkpointing each time to make it run
>> successfully and moreover it doesn't get stopped.
>>
>> ERROR StreamingContext: Error starting the context, marking it as stopped
>> org.apache.spark.SparkException:
>> org.apache.spark.streaming.dstream.WindowedDStream@532d0784 has not been
>> initialized
>> at
>> org.apache.spark.streaming.dstream.DStream.isTimeValid(DStream.scala:321)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at
>>
>> org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:342)
>> at scala.Option.orElse(Option.scala:257)
>> at
>> org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:339)
>> at
>>
>> org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:38)
>> at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>>
>> org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:120)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>>
>> scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>> scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
>> at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
>> at
>>
>> org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:120)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:227)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$restart$4.apply(JobGenerator.scala:222)
>> at
>>
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>> at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator.restart(JobGenerator.scala:222)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobGenerator.start(JobGenerator.scala:92)
>> at
>>
>> org.apache.spark.streaming.scheduler.JobScheduler.start(JobScheduler.scala:73)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:588)
>> at
>>
>> org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:586)
>> at twitter.streamingSpark$.twitterConnector(App.scala:38)
>> at twitter.streamingSpark$.main(App.scala:26)
>> at twitter.streamingSpark.main(App.scala)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:606)
>> at
>>
>> org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:664)
>> at
>> org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:169)
>> 

RE: Spark groupby and agg inconsistent and missing data

2015-10-22 Thread Saif.A.Ellafi
nevermind my last email. res2 is filtered so my test does not make sense. The 
issue is not reproduced there. I have the problem somwhere else.

From: Ellafi, Saif A.
Sent: Thursday, October 22, 2015 12:57 PM
To: 'Xiao Li'
Cc: user
Subject: RE: Spark groupby and agg inconsistent and missing data

Thanks, sorry I cannot share the data and not sure how much significant it will 
be for you.
I am reproducing the issue on a smaller piece of the content and see wether I 
find a reason on the inconsistence.

val res2 = data.filter($"closed" === $"ever_closed").groupBy("product", "band 
", "aget", "vine", "time", "mm").agg(count($"account_id").as("N"), 
sum($"balance").as("balance"), sum($"spend").as("spend"), 
sum($"payment").as("payment")).persist()

then I collect distinct values of “vine” (which is StringType) both from data 
and res2, and res2 is missing a lot of values:

val t1 = res2.select("vine").distinct.collect
scala> t1.size
res10: Int = 617

val t_real = data.select("vine").distinct.collect
scala> t_real.size
res9: Int = 639


From: Xiao Li [mailto:gatorsm...@gmail.com]
Sent: Thursday, October 22, 2015 12:45 PM
To: Ellafi, Saif A.
Cc: user
Subject: Re: Spark groupby and agg inconsistent and missing data

Hi, Saif,

Could you post your code here? It might help others reproduce the errors and 
give you a correct answer.

Thanks,

Xiao Li

2015-10-22 8:27 GMT-07:00 
>:
Hello everyone,

I am doing some analytics experiments under a 4 server stand-alone cluster in a 
spark shell, mostly involving a huge database with groupBy and aggregations.

I am picking 6 groupBy columns and returning various aggregated results in a 
dataframe. GroupBy fields are of two types, most of them are StringType and the 
rest are LongType.

The data source is a splitted json file dataframe,  once the data is persisted, 
the result is consistent. But if I unload the memory and reload the data, the 
groupBy action returns different content results, missing data.

Could I be missing something? this is rather serious for my analytics, and not 
sure how to properly diagnose this situation.

Thanks,
Saif




Maven Repository Hosting for Spark SQL 1.5.1

2015-10-22 Thread William Li

Hi - I tried to download the Spark SQL 2.10 and version 1.5.1 from Intellij 
using the maven library:

-Project Structure
-Global Library, click on the + to select Maven Repository
-Type in org.apache.spark to see the list.
-The list result only shows version up to spark-sql_2.10-1.1.1
-I tried to manually typed in the version 1.5.1 but it doesn't download the 
correct list of files needed.

I can't see the 1.5.1 version. So there seems to be some problem because 
Intellij reported that org.apache.spark:spark-sql_2.10-1.5.1 is not available 
or not complete.

Does anyone know who to contact to verify that?


Thanks,

William.


Fwd: multiple pyspark instances simultaneously (same time)

2015-10-22 Thread Jeff Sadowski
On Thu, Oct 22, 2015 at 5:40 AM, Akhil Das 
wrote:

> Did you read
> https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application
>

I did.

I had set the option

spark.scheduler.mode FAIR

in conf/spark-defaults.conf
and
created fairscheduler.xml
with the two pools production and test

and noticed when I start pyspark and run

sc.setLocalProperty("spark.scheduler.pool", null)

does not work it gives me
NameError: name 'null' is not defined

I tried getting in the production pool so I would have FAIR scheduling

sc.setLocalProperty("spark.scheduler.pool", *"production"*)

and


sc.getLocalProperty("spark.scheduler.pool")

shows

u'production'

I also noticed I could join pools that are not created and it shows that I
am in that uncreated pool as if

sc.setLocalProperty("spark.scheduler.pool", *"production"*)

wasn't really doing anything.

and as I was say

it still behaves as if it isn't doing FAIR scheduling if I am in the
production pool

when I start pyspark as a second user and do
 .

sc.setLocalProperty("spark.scheduler.pool", *"production"*)

It still says waiting on the master's status page.

and still gives me

Initial job has not accepted any resources

If I try to do something as that second user.



> Thanks
> Best Regards
>
> On Thu, Oct 15, 2015 at 11:31 PM, jeff.sadow...@gmail.com <
> jeff.sadow...@gmail.com> wrote:
>
>> I am having issues trying to setup spark to run jobs simultaneously.
>>
>> I thought I wanted FAIR scheduling?
>>
>> I used the templated fairscheduler.xml as is when I start pyspark I see
>> the
>> 3 expected pools:
>> production, test, and default
>>
>> when I login as second user and run pyspark
>> I see the expected pools as that user as well
>>
>> when I open up a webbrowser to http://master:8080
>>
>> I see my first user's state is running and my second user's state is
>> waiting
>>
>> so I try putting them both in the production pool which is fair scheduler
>>
>> When I refresh http://master:8080
>>
>> the second user's status is still waiting.
>>
>> If I try to run something as the second user I get
>>
>> "Initial job has not accepted any resources"
>>
>> Maybe fair queuing is not what I want?
>>
>> I'm starting pyspark as follows
>>
>> pyspark --master spark://master:7077
>>
>> I started spark as follows
>>
>> start-all.sh
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/multiple-pyspark-instances-simultaneously-same-time-tp25079.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>>
>


Re: Large number of conf broadcasts

2015-10-22 Thread Koert Kuipers
i am seeing the same thing. its gona completely crazy creating broadcasts
for the last 15 mins or so. killing it...

On Thu, Sep 24, 2015 at 1:24 PM, Anders Arpteg  wrote:

> Hi,
>
> Running spark 1.5.0 in yarn-client mode, and am curios in why there are so
> many broadcast being done when loading datasets with large number of
> partitions/files. Have datasets with thousands of partitions, i.e. hdfs
> files in the avro folder, and sometime loading hundreds of these large
> datasets. Believe I have located the broadcast to line
> SparkContext.scala:1006. It seems to just broadcast the hadoop
> configuration, and I don't see why it should be necessary to broadcast that
> for EVERY file? Wouldn't it be possible to reuse the same broadcast
> configuration? It hardly the case the the configuration would be different
> between each file in a single dataset. Seems to be wasting lots of memory
> and needs to persist unnecessarily to disk (see below again).
>
> Thanks,
> Anders
>
> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1871_piece0
> to disk  [19/49086]15/09/24
> 17:11:11 INFO BlockManagerInfo: Added broadcast_1871_piece0 on disk on
> 10.254.35.24:49428 (size: 23.1 KB)
> 15/09/24 17:11:11 INFO MemoryStore: Block broadcast_4803_piece0 stored as
> bytes in memory (estimated size 23.1 KB, free 2.4 KB)
> 15/09/24 17:11:11 INFO BlockManagerInfo: Added broadcast_4803_piece0 in
> memory on 10.254.35.24:49428 (size: 23.1 KB, free: 464.0 MB)
> 15/09/24 17:11:11 INFO SpotifySparkContext: Created broadcast 4803 from
> hadoopFile at AvroRelation.scala:121
> 15/09/24 17:11:11 WARN MemoryStore: Failed to reserve initial memory
> threshold of 1024.0 KB for computing block broadcast_4804 in memory
> .
> 15/09/24 17:11:11 WARN MemoryStore: Not enough space to cache
> broadcast_4804 in memory! (computed 496.0 B so far)
> 15/09/24 17:11:11 INFO MemoryStore: Memory use = 530.3 MB (blocks) + 0.0 B
> (scratch space shared across 0 tasks(s)) = 530.3 MB. Storage
> limit = 530.3 MB.
> 15/09/24 17:11:11 WARN MemoryStore: Persisting block broadcast_4804 to
> disk instead.
> 15/09/24 17:11:11 INFO MemoryStore: ensureFreeSpace(23703) called with
> curMem=556036460, maxMem=556038881
> 15/09/24 17:11:11 INFO MemoryStore: 1 blocks selected for dropping
> 15/09/24 17:11:11 INFO BlockManager: Dropping block broadcast_1872_piece0
> from memory
> 15/09/24 17:11:11 INFO BlockManager: Writing block broadcast_1872_piece0
> to disk
>
>