RE: Issue when rebroadcasting a variable outside of the definition scope

2015-08-07 Thread Ganelin, Ilya
Simone, here are some thoughts. Please check out the understanding closures 
section of the Spark Programming Guide. Secondly, broadcast variables do not 
propagate updates to the underlying data. You must either create a new 
broadcast variable or alternately if you simply wish to accumulate results you 
can use an Accumulator that stores an array or queue as a buffer that you then 
read from to Kafka.

You should also be able to send the results to a new DStream instead, and link 
that DStream to Kafka. Hope this gives you some ideas to play with. Thanks!



Thank you,
Ilya Ganelin



-Original Message-
From: simone.robutti [simone.robu...@gmail.commailto:simone.robu...@gmail.com]
Sent: Friday, August 07, 2015 10:07 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Issue when rebroadcasting a variable outside of the definition scope


Hello everyone,

this is my first message ever to a mailing list so please pardon me if for
some reason I'm violating the etiquette.

I have a problem with rebroadcasting a variable. How it should work is not
well documented so I could find only a few and simple example to understand
how it should work.

What I'm trying to do is to propagate an update to the option for the
behaviour of my streaming transformations (in this case, the evaluation of
machine learning models). I have a listener on a kafka queue that wait for
messages and update the broadcasted variable.

I made it to work but the system doesn't rebroadcast anything if I pass the
DStream or the broadcasted variable as a parameter.

So they must be defined both in the same scope and the rebroadcasting should
happen again in the same scope. Right now my main function looks like this:
--
 var updateVar= sc.broadcast(test)
 val stream=input.map(x = myTransformation(x,updateVar))
 stream.writeToKafka[String, String](outputProps,
(m: String) = new KeyedMessage[String,
String](configuration.outputTopic, m +updateVar.value ))

val controlStream = connector.createMessageStreamsByFilter(filterSpec, 1,
new DefaultDecoder(), new StringDecoder())(0)
for (messageAndTopic - controlStream) {

println(ricevo)
updateVar.unpersist()
updateVar=ssc.sparkContext.broadcast(messageAndTopic.message)


}

ssc.start()
ssc.awaitTermination()

--

updateVar is correctly updated both in myTransformation and in the main
scope and I can access the updated value.

But when I try  to do this moving the logic to a class, it fails. I have
something like this (or the same queue listener from before, but moved to
another class):

class Listener(var updateVar: Broadcast[String]){...
def someFunc()={
   updateVar.unpersist()
   updateVar=sc.broadcast(new value)
}
...
}

This fails: the variable can be destroyed but cannot be updated.

Any suggestion on why there is this behaviour? Also I would like to know how
Spark notices the reassignment to var and start the rebroadcasting.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Issue-when-rebroadcasting-a-variable-outside-of-the-definition-scope-tp24172.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: How to read gzip data in Spark - Simple question

2015-08-05 Thread Ganelin, Ilya
Have you tried reading the spark documentation?

http://spark.apache.org/docs/latest/programming-guide.html



Thank you,
Ilya Ganelin



-Original Message-
From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com]
Sent: Thursday, August 06, 2015 12:41 AM Eastern Standard Time
To: Philip Weaver
Cc: user
Subject: Re: How to read gzip data in Spark - Simple question

how do i persist the RDD to HDFS ?

On Wed, Aug 5, 2015 at 8:32 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
This message means that java.util.Date is not supported by Spark DataFrame. 
You'll need to use java.sql.Date, I believe.

On Wed, Aug 5, 2015 at 8:29 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
That seem to be working. however i see a new exception

Code:
def formatStringAsDate(dateStr: String) = new 
SimpleDateFormat(-MM-dd).parse(dateStr)

//(2015-07-27,12459,,31242,6,Daily,-999,2099-01-01,2099-01-02,1,0,0.1,0,1,-1,isGeo,,,204,694.0,1.9236856708701322E-4,0.0,-4.48,0.0,0.0,0.0,)
val rowStructText = 
sc.textFile(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz)
case class Summary(f1: Date, f2: Long, f3: Long, f4: Integer, f5 : String, f6: 
Integer, f7 : Date, f8: Date, f9: Integer, f10: Integer, f11: Float, f12: 
Integer, f13: Integer, f14: String)

val summary  = rowStructText.map(s = s.split(,)).map(
s = Summary(formatStringAsDate(s(0)),
s(1).replaceAll(\, ).toLong,
s(3).replaceAll(\, ).toLong,
s(4).replaceAll(\, ).toInt,
s(5).replaceAll(\, ),
s(6).replaceAll(\, ).toInt,
formatStringAsDate(s(7)),
formatStringAsDate(s(8)),
s(9).replaceAll(\, ).toInt,
s(10).replaceAll(\, ).toInt,
s(11).replaceAll(\, ).toFloat,
s(12).replaceAll(\, ).toInt,
s(13).replaceAll(\, ).toInt,
s(14).replaceAll(\, )
)
).toDF()
bank.registerTempTable(summary)


//Output
import java.text.SimpleDateFormat import java.util.Calendar import 
java.util.Date formatStringAsDate: (dateStr: String)java.util.Date 
rowStructText: org.apache.spark.rdd.RDD[String] = 
/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz 
MapPartitionsRDD[105] at textFile at console:60 defined class Summary x: 
org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[106] at map at console:61 
java.lang.UnsupportedOperationException: Schema for type java.util.Date is not 
supported at 
org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:188)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30)
 at 
org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:164)

Any suggestions

On Wed, Aug 5, 2015 at 8:18 PM, Philip Weaver 
philip.wea...@gmail.commailto:philip.wea...@gmail.com wrote:
The parallelize method does not read the contents of a file. It simply takes a 
collection and distributes it to the cluster. In this case, the String is a 
collection 67 characters.

Use sc.textFile instead of sc.parallelize, and it should work as you want.

On Wed, Aug 5, 2015 at 8:12 PM, ÐΞ€ρ@Ҝ (๏̯͡๏) 
deepuj...@gmail.commailto:deepuj...@gmail.com wrote:
I have csv data that is embedded in gzip format on HDFS.

With Pig

a = load '/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-3.gz' 
using PigStorage();

b = limit a 10

(2015-07-27,12459,,31243,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,203,4810370.0,1.4090459061723766,1.017458,-0.03,-0.11,0.05,0.468666,)

(2015-07-27,12459,,31241,6,Daily,-999,2099-01-01,2099-01-02,4,0,0.1,0,1,0,isGeo,,,203,7937613.0,1.1624841995932425,1.11562,-0.06,-0.15,0.03,0.233283,)


However with Spark

val rowStructText = 
sc.parallelize(/user/zeppelin/aggregatedsummary/2015/08/03/regular/part-m-0.gz)

val x = rowStructText.map(s = {

println(s)

s}

)

x.count

Questions

1) x.count always shows 67 irrespective of the path i change in sc.parallelize

2) It shows x as RDD[Char] instead of String

3) println() never emits the rows.

Any suggestions

-Deepak


--
Deepak





--
Deepak





--
Deepak



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

2015-07-22 Thread Ganelin, Ilya
To be Unpersisted the RDD must be persisted first. If it's set to None, then 
it's not persisted, and as such does not need to be freed. Does that make sense 
?



Thank you,
Ilya Ganelin



-Original Message-
From: Stahlman, Jonathan 
[jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com]
Sent: Wednesday, July 22, 2015 01:42 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello again,

In trying to understand the caching of intermediate RDDs by ALS, I looked into 
the source code and found what may be a bug.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala#L230

you see that ALS.train() is being called with finalRDDStorageLevel = 
StorageLevel.NONE, which I would understand to mean that the intermediate RDDs 
will not be persisted.  Looking here:

https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala#L631

unpersist() is only being called on the intermediate RDDs (all the *Blocks RDDs 
listed in my first post) if finalRDDStorageLevel != StorageLevel.NONE.

This doesn’t make sense to me – I would expect the RDDs to be removed from the 
cache if finalRDDStorageLevel == StorageLevel.NONE, not the other way around.

Jonathan


From: Stahlman, Stahlman Jonathan 
jonathan.stahl...@capitalone.commailto:jonathan.stahl...@capitalone.com
Date: Thursday, July 16, 2015 at 2:18 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: How to unpersist RDDs generated by ALS/MatrixFactorizationModel

Hello all,

I am running the Spark recommendation algorithm in MLlib and I have been 
studying its output with various model configurations.  Ideally I would like to 
be able to run one job that trains the recommendation model with many different 
configurations to try to optimize for performance.  A sample code in python is 
copied below.

The issue I have is that each new model which is trained caches a set of RDDs 
and eventually the executors run out of memory.  Is there any way in Pyspark to 
unpersist() these RDDs after each iteration?  The names of the RDDs which I 
gather from the UI is:

itemInBlocks
itemOutBlocks
Products
ratingBlocks
userInBlocks
userOutBlocks
users

I am using Spark 1.3.  Thank you for any help!

Regards,
Jonathan




  data_train, data_cv, data_test = data.randomSplit([99,1,1], 2)
  functions = [rating] #defined elsewhere
  ranks = [10,20]
  iterations = [10,20]
  lambdas = [0.01,0.1]
  alphas  = [1.0,50.0]

  results = []
  for ratingFunction, rank, numIterations, m_lambda, m_alpha in 
itertools.product( functions, ranks, iterations, lambdas, alphas ):
#train model
ratings_train = data_train.map(lambda l: Rating( l.user, l.product, 
ratingFunction(l) ) )
model   = ALS.trainImplicit( ratings_train, rank, numIterations, 
lambda_=float(m_lambda), alpha=float(m_alpha) )

#test performance on CV data
ratings_cv = data_cv.map(lambda l: Rating( l.uesr, l.product, 
ratingFunction(l) ) )
auc = areaUnderCurve( ratings_cv, model.predictAll )

#save results
result = ,.join(str(l) for l in 
[ratingFunction.__name__,rank,numIterations,m_lambda,m_alpha,auc])
results.append(result)



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Real-time data visualization with Zeppelin

2015-07-08 Thread Ganelin, Ilya
Hi all – I’m just wondering if anyone has had success integrating Spark 
Streaming with Zeppelin and actually dynamically updating the data in near 
real-time. From my investigation, it seems that Zeppelin will only allow you to 
display a snapshot of data, not a continuously updating table. Has anyone 
figured out if there’s a way to loop a display command or how to provide a 
mechanism to continuously update visualizations?

Thank you,
Ilya Ganelin

[cid:0042A8D7-6242-41E8-80ED-0D0CC16C96B5]


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Making Unpersist Lazy

2015-07-02 Thread Ganelin, Ilya
You may pass an optional parameter (blocking = false) to make it lazy.



Thank you,
Ilya Ganelin



-Original Message-
From: Jem Tucker [jem.tuc...@gmail.commailto:jem.tuc...@gmail.com]
Sent: Thursday, July 02, 2015 04:06 AM Eastern Standard Time
To: Akhil Das
Cc: user
Subject: Re: Making Unpersist Lazy

Hi,

After running some tests it appears the unpersist is called as soon as it is 
reached, so any tasks using this rdd later on will have to re calculate it. 
This is fine for simple programs but when an rdd is created within a function 
and its reference is then lost but children of it continue to be used the 
persist/unpersist does not work effectively

Thanks

Jem
On Thu, 2 Jul 2015 at 08:18, Akhil Das 
ak...@sigmoidanalytics.commailto:ak...@sigmoidanalytics.com wrote:
rdd's which are no longer required will be removed from memory by spark itself 
(which you can consider as lazy?).

Thanks
Best Regards

On Wed, Jul 1, 2015 at 7:48 PM, Jem Tucker 
jem.tuc...@gmail.commailto:jem.tuc...@gmail.com wrote:
Hi,

The current behavior of rdd.unpersist() appears to not be lazily executed and 
therefore must be placed after an action. Is there any way to emulate lazy 
execution of this function so it is added to the task queue?

Thanks,

Jem



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Performing sc.paralleize (..) in workers not in the driver program

2015-06-25 Thread Ganelin, Ilya
The parallelize operation accepts as input a data structure in memory. When you 
call it, you are necessarily operating In the memory space of the driver since 
that is where user code executes. Until you have an RDD, you can't really 
operate in a distributed way.

If your files are stores in a distributed file system such as HDFS then you can 
create an RDD from those directly with sc.textFile(...).



Thank you,
Ilya Ganelin



-Original Message-
From: shahab [shahab.mok...@gmail.commailto:shahab.mok...@gmail.com]
Sent: Thursday, June 25, 2015 12:46 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Performing sc.paralleize (..) in workers not in the driver program

Hi,

Apparently, sc.paralleize (..)  operation is performed in the driver program 
not in the workers ! Is it possible to do this in worker process for the sake 
of scalability?

best
/Shahab


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Matrix Multiplication and mllib.recommendation

2015-06-17 Thread Ganelin, Ilya
Actually talk about this exact thing in a blog post here 
http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/.
 Keep in mind, you're actually doing a ton of math. Even with proper caching 
and use of broadcast variables this will take a while defending on the size of 
your cluster. To get real results you may want to look into locality sensitive 
hashing to limit your search space and definitely look into spinning up 
multiple threads to process your product features in parallel to increase 
resource utilization on the cluster.



Thank you,
Ilya Ganelin



-Original Message-
From: afarahat [ayman.fara...@yahoo.commailto:ayman.fara...@yahoo.com]
Sent: Wednesday, June 17, 2015 11:16 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Matrix Multiplication and mllib.recommendation


Hello;
I am trying to get predictions after running the ALS model.
The model works fine. In the prediction/recommendation , I have about 30
,000 products and 90 Millions users.
When i try the predict all it fails.
I have been trying to formulate the problem as a Matrix multiplication where
I first get the product features, broadcast them and then do a dot product.
Its still very slow. Any reason why
here is a sample code

def doMultiply(x):
a = []
#multiply by
mylen = len(pf.value)
for i in range(mylen) :
  myprod = numpy.dot(x,pf.value[i][1])
  a.append(myprod)
return a


myModel = MatrixFactorizationModel.load(sc, FlurryModelPath)
#I need to select which products to broadcast but lets try all
m1 = myModel.productFeatures().sample(False, 0.001)
pf = sc.broadcast(m1.collect())
uf = myModel.userFeatures()
f1 = uf.map(lambda x : (x[0], doMultiply(x[1])))



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Matrix-Multiplication-and-mllib-recommendation-tp23384.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed. If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Does long-lived SparkContext hold on to executor resources?

2015-05-11 Thread Ganelin, Ilya
Also check out the spark.cleaner.ttl property. Otherwise, you will accumulate 
shuffle metadata in the memory of the driver.



Sent with Good (www.good.com)


-Original Message-
From: Silvio Fiorito 
[silvio.fior...@granturing.commailto:silvio.fior...@granturing.com]
Sent: Monday, May 11, 2015 01:03 PM Eastern Standard Time
To: stanley; user@spark.apache.org
Subject: Re: Does long-lived SparkContext hold on to executor resources?


You want to look at dynamic resource allocation, here: 
http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation





On 5/11/15, 11:23 AM, stanley wangshua...@yahoo.com wrote:

I am building an analytics app with Spark. I plan to use long-lived
SparkContexts to minimize the overhead for creating Spark contexts, which in
turn reduces the analytics query response time.

The number of queries that are run in the system is relatively small each
day. Would long lived contexts hold on to the executor resources when there
is no queries running? Is there a way to free executor resources in this
type of use cases?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-long-lived-SparkContext-hold-on-to-executor-resources-tp22848.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: ReduceByKey and sorting within partitions

2015-04-27 Thread Ganelin, Ilya
Marco - why do you want data sorted both within and across partitions? If you 
need to take an ordered sequence across all your data you need to either 
aggregate your RDD on the driver and sort it, or use zipWithIndex to apply an 
ordered index to your data that matches the order it was stored on HDFS. You 
can then get the data in order by filtering based on that index. Let me know if 
that's not what you need - thanks!



Sent with Good (www.good.com)


-Original Message-
From: Marco [marcope...@gmail.commailto:marcope...@gmail.com]
Sent: Monday, April 27, 2015 07:01 AM Eastern Standard Time
To: user@spark.apache.org
Subject: ReduceByKey and sorting within partitions


Hi,

I'm trying, after reducing by key, to get data ordered among partitions
(like RangePartitioner) and within partitions (like sortByKey or
repartitionAndSortWithinPartition) pushing the sorting down to the
shuffles machinery of the reducing phase.

I think, but maybe I'm wrong, that the correct way to do that is that
combineByKey call setKeyOrdering function on the ShuflleRDD that it returns.

Am I wrong? Can be done by a combination of other transformations with
the same efficiency?

Thanks,
Marco

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

2015-04-27 Thread Ganelin, Ilya
What command are you using to untar? Are you running out of disk space?



Sent with Good (www.good.com)


-Original Message-
From: ÐΞ€ρ@Ҝ (๏̯͡๏) [deepuj...@gmail.commailto:deepuj...@gmail.com]
Sent: Monday, April 27, 2015 11:44 AM Eastern Standard Time
To: user
Subject: Spark 1.3.1 Hadoop 2.4 Prebuilt package broken ?

I downloaded 1.3.1 hadoop 2.4 prebuilt package (tar) from multiple mirrors and 
direct link. Each time i untar i get below error


spark-1.3.1-bin-hadoop2.4/lib/spark-assembly-1.3.1-hadoop2.4.0.jar: (Empty 
error message)

tar: Error exit delayed from previous errors


Is it broken ?

--
Deepak



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?

2015-04-24 Thread Ganelin, Ilya
If you're reading a file one by line then you should simply use Java's Hadoop 
FileSystem class to read the file with a BuffereInputStream. I don't think you 
need an RDD here.



Sent with Good (www.good.com)


-Original Message-
From: Michal Michalski 
[michal.michal...@boxever.commailto:michal.michal...@boxever.com]
Sent: Friday, April 24, 2015 11:04 AM Eastern Standard Time
To: Ganelin, Ilya
Cc: Spico Florin; user
Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input 
data from Hadoop?

The problem I'm facing is that I need to process lines from input file in the 
order they're stored in the file, as they define the order of updates I need to 
apply on some data and these updates are not commutative so that order matters. 
Unfortunately the input is purely order-based, theres no timestamp per line 
etc. in the file and I'd prefer to avoid preparing the file in advance by 
adding ordinals before / after each line. From the approaches you suggested 
first two won't work as there's nothing I could sort by. I'm not sure about the 
third one - I'm just not sure what you meant there to be honest :-)

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 15:48, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote:
Michael - you need to sort your RDD. Check out the shuffle documentation on the 
Spark Programming Guide. It talks about this specifically. You can resolve this 
in a couple of ways - either by collecting your RDD and sorting it, using 
sortBy, or not worrying about the internal ordering. You can still extract 
elements in order by using a filter with the zip if e.g RDD.filter(s = s._2  
50).sortBy(_._1)



Sent with Good (www.good.comhttp://www.good.com)



-Original Message-
From: Michal Michalski 
[michal.michal...@boxever.commailto:michal.michal...@boxever.com]
Sent: Friday, April 24, 2015 10:41 AM Eastern Standard Time
To: Spico Florin
Cc: user
Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input 
data from Hadoop?

Of course after you do it, you probably want to call repartition(somevalue) on 
your RDD to get your paralellism back.

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 15:28, Michal Michalski 
michal.michal...@boxever.commailto:michal.michal...@boxever.com wrote:
I did a quick test as I was curious about it too. I created a file with numbers 
from 0 to 999, in order, line by line. Then I did:

scala val numbers = sc.textFile(./numbers.txt)
scala val zipped = numbers.zipWithUniqueId
scala zipped.foreach(i = println(i))

Expected result if the order was preserved would be something like: (0, 0), (1, 
1) etc.
Unfortunately, the output looks like this:

(126,1)
(223,2)
(320,3)
(1,0)
(127,11)
(2,10)
(...)

The workaround I found that works for me for my specific use case (relatively 
small input files) is setting explicitly the number of partitions to 1 when 
reading a single *text* file:

scala val numbers_sp = sc.textFile(./numbers.txt, 1)

Than the output is exactly as I would expect.

I didn't dive into the code too much, but I took a very quick look at it and 
figured out - correct me if I missed something, it's Friday afternoon! ;-)  - 
that this workaround will work fine for all the input formats inheriting from 
org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - 
see the implementation of getSplits() method there ( 
http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29
 ).
The numSplits variable passed there is exactly the same value as you provide as 
a second argument to textFile, which is minPartitions. However, while *min* 
suggests that we can only define a minimal number of partitions, while we have 
no control over the max, from what I can see in the code, that value specifies 
the *exact* number of partitions per the FileInputFormat.getSplits 
implementation. Of course it can differ for other input formats, but in this 
case it should work just fine.


Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 14:05, Spico Florin 
spicoflo...@gmail.commailto:spicoflo...@gmail.com wrote:
Hello!
  I know that HadoopRDD partitions are built based on the number of splits in 
HDFS. I'm wondering if these partitions preserve the initial order of data in 
file.
As an example, if I have an HDFS (myTextFile) file that has these splits:

split 0- line 1, ..., line k
split 1-line k+1,..., line k+n
splt 2-line k+n, line k+n+m

and the code
val lines=sc.textFile(hdfs://mytextFile)
lines.zipWithIndex()

will the order of lines preserved?
(line 1, zipIndex 1) , .. (line k, zipIndex k), and so one.

I found

RE: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?

2015-04-24 Thread Ganelin, Ilya
Michael - you need to sort your RDD. Check out the shuffle documentation on the 
Spark Programming Guide. It talks about this specifically. You can resolve this 
in a couple of ways - either by collecting your RDD and sorting it, using 
sortBy, or not worrying about the internal ordering. You can still extract 
elements in order by using a filter with the zip if e.g RDD.filter(s = s._2  
50).sortBy(_._1)



Sent with Good (www.good.com)


-Original Message-
From: Michal Michalski 
[michal.michal...@boxever.commailto:michal.michal...@boxever.com]
Sent: Friday, April 24, 2015 10:41 AM Eastern Standard Time
To: Spico Florin
Cc: user
Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input 
data from Hadoop?

Of course after you do it, you probably want to call repartition(somevalue) on 
your RDD to get your paralellism back.

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 15:28, Michal Michalski 
michal.michal...@boxever.commailto:michal.michal...@boxever.com wrote:
I did a quick test as I was curious about it too. I created a file with numbers 
from 0 to 999, in order, line by line. Then I did:

scala val numbers = sc.textFile(./numbers.txt)
scala val zipped = numbers.zipWithUniqueId
scala zipped.foreach(i = println(i))

Expected result if the order was preserved would be something like: (0, 0), (1, 
1) etc.
Unfortunately, the output looks like this:

(126,1)
(223,2)
(320,3)
(1,0)
(127,11)
(2,10)
(...)

The workaround I found that works for me for my specific use case (relatively 
small input files) is setting explicitly the number of partitions to 1 when 
reading a single *text* file:

scala val numbers_sp = sc.textFile(./numbers.txt, 1)

Than the output is exactly as I would expect.

I didn't dive into the code too much, but I took a very quick look at it and 
figured out - correct me if I missed something, it's Friday afternoon! ;-)  - 
that this workaround will work fine for all the input formats inheriting from 
org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - 
see the implementation of getSplits() method there ( 
http://grepcode.com/file/repo1.maven.org/maven2/org.jvnet.hudson.hadoop/hadoop-core/0.19.1-hudson-2/org/apache/hadoop/mapred/FileInputFormat.java#FileInputFormat.getSplits%28org.apache.hadoop.mapred.JobConf%2Cint%29
 ).
The numSplits variable passed there is exactly the same value as you provide as 
a second argument to textFile, which is minPartitions. However, while *min* 
suggests that we can only define a minimal number of partitions, while we have 
no control over the max, from what I can see in the code, that value specifies 
the *exact* number of partitions per the FileInputFormat.getSplits 
implementation. Of course it can differ for other input formats, but in this 
case it should work just fine.


Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 14:05, Spico Florin 
spicoflo...@gmail.commailto:spicoflo...@gmail.com wrote:
Hello!
  I know that HadoopRDD partitions are built based on the number of splits in 
HDFS. I'm wondering if these partitions preserve the initial order of data in 
file.
As an example, if I have an HDFS (myTextFile) file that has these splits:

split 0- line 1, ..., line k
split 1-line k+1,..., line k+n
splt 2-line k+n, line k+n+m

and the code
val lines=sc.textFile(hdfs://mytextFile)
lines.zipWithIndex()

will the order of lines preserved?
(line 1, zipIndex 1) , .. (line k, zipIndex k), and so one.

I found this question on stackoverflow 
(http://stackoverflow.com/questions/26046410/how-can-i-obtain-an-element-position-in-sparks-rdd)
 whose answer intrigued me:
Essentially, RDD's zipWithIndex() method seems to do this, but it won't 
preserve the original ordering of the data the RDD was created from

Can you please confirm that is this the correct answer?

Thanks.
 Florin









The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Does HadoopRDD.zipWithIndex method preserve the order of the input data from Hadoop?

2015-04-24 Thread Ganelin, Ilya
To maintain the order you can use zipWithIndex as Sean Owen pointed out. This 
is the same as zipWithUniqueId except the assigned number is the index of the 
data in the RDD which I believe matches the order of data as it's stored on 
HDFS.



Sent with Good (www.good.com)


-Original Message-
From: Michal Michalski 
[michal.michal...@boxever.commailto:michal.michal...@boxever.com]
Sent: Friday, April 24, 2015 11:18 AM Eastern Standard Time
To: Ganelin, Ilya
Cc: Spico Florin; user
Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input 
data from Hadoop?

I read it one by one as I need to maintain the order, but it doesn't mean that 
I process them one by one later. Input lines refer to different entities I 
update, so once I read them in order, I group them by the id of the entity I 
want to update, sort the updates on per-entity basis and process them further 
in parallel (including writing data to C* and Kafka at the very end). That's 
what I use Spark for - the first step I ask about is just a requirement related 
to the input format I get and need to support. Everything what happens after 
that is just a normal data processing job that you want to distribute.

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 16:10, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote:
If you're reading a file one by line then you should simply use Java's Hadoop 
FileSystem class to read the file with a BuffereInputStream. I don't think you 
need an RDD here.



Sent with Good (www.good.comhttp://www.good.com)


-Original Message-
From: Michal Michalski 
[michal.michal...@boxever.commailto:michal.michal...@boxever.com]
Sent: Friday, April 24, 2015 11:04 AM Eastern Standard Time
To: Ganelin, Ilya
Cc: Spico Florin; user
Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input 
data from Hadoop?

The problem I'm facing is that I need to process lines from input file in the 
order they're stored in the file, as they define the order of updates I need to 
apply on some data and these updates are not commutative so that order matters. 
Unfortunately the input is purely order-based, theres no timestamp per line 
etc. in the file and I'd prefer to avoid preparing the file in advance by 
adding ordinals before / after each line. From the approaches you suggested 
first two won't work as there's nothing I could sort by. I'm not sure about the 
third one - I'm just not sure what you meant there to be honest :-)

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 15:48, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote:
Michael - you need to sort your RDD. Check out the shuffle documentation on the 
Spark Programming Guide. It talks about this specifically. You can resolve this 
in a couple of ways - either by collecting your RDD and sorting it, using 
sortBy, or not worrying about the internal ordering. You can still extract 
elements in order by using a filter with the zip if e.g RDD.filter(s = s._2  
50).sortBy(_._1)



Sent with Good (www.good.comhttp://www.good.com)



-Original Message-
From: Michal Michalski 
[michal.michal...@boxever.commailto:michal.michal...@boxever.com]
Sent: Friday, April 24, 2015 10:41 AM Eastern Standard Time
To: Spico Florin
Cc: user
Subject: Re: Does HadoopRDD.zipWithIndex method preserve the order of the input 
data from Hadoop?

Of course after you do it, you probably want to call repartition(somevalue) on 
your RDD to get your paralellism back.

Kind regards,
Michał Michalski,
michal.michal...@boxever.commailto:michal.michal...@boxever.com

On 24 April 2015 at 15:28, Michal Michalski 
michal.michal...@boxever.commailto:michal.michal...@boxever.com wrote:
I did a quick test as I was curious about it too. I created a file with numbers 
from 0 to 999, in order, line by line. Then I did:

scala val numbers = sc.textFile(./numbers.txt)
scala val zipped = numbers.zipWithUniqueId
scala zipped.foreach(i = println(i))

Expected result if the order was preserved would be something like: (0, 0), (1, 
1) etc.
Unfortunately, the output looks like this:

(126,1)
(223,2)
(320,3)
(1,0)
(127,11)
(2,10)
(...)

The workaround I found that works for me for my specific use case (relatively 
small input files) is setting explicitly the number of partitions to 1 when 
reading a single *text* file:

scala val numbers_sp = sc.textFile(./numbers.txt, 1)

Than the output is exactly as I would expect.

I didn't dive into the code too much, but I took a very quick look at it and 
figured out - correct me if I missed something, it's Friday afternoon! ;-)  - 
that this workaround will work fine for all the input formats inheriting from 
org.apache.hadoop.mapred.FileInputFormat including TextInputFormat, of course - 
see the implementation of getSplits() method

RE: Map Question

2015-04-23 Thread Ganelin, Ilya
You need to expose that variable the same way you'd expose any other variable 
in Python that you wanted to see across modules. As long as you share a spark 
context all will work as expected.

http://stackoverflow.com/questions/142545/python-how-to-make-a-cross-module-variable



Sent with Good (www.good.com)


-Original Message-
From: Vadim Bichutskiy 
[vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com]
Sent: Thursday, April 23, 2015 12:00 PM Eastern Standard Time
To: Tathagata Das
Cc: user@spark.apache.org
Subject: Re: Map Question

Here it is. How do I access a broadcastVar in a function that's in another 
module (process_stuff.py below):

Thanks,
Vadim

main.py
---

from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext
from process_stuff import myfunc
from metadata import get_metadata

conf = SparkConf().setAppName('My App').setMaster('local[4]')
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
sqlContext = SQLContext(sc)

distFile = ssc.textFileStream(s3n://...)

distFile.foreachRDD(process)

mylist = get_metadata()

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

## mylist and broadcastVar, broadcastVar.value print fine

def getSqlContextInstance(sparkContext):

if ('sqlContextSingletonInstance' not in globals()):
globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext)
return globals()['sqlContextSingletonInstance']

def process(rdd):

sqlContext = getSqlContextInstance(rdd.context)

if rdd.take(1):

jsondf = sqlContext.jsonRDD(rdd)

#jsondf.printSchema()

jsondf.registerTempTable('mytable')

stuff = sqlContext.sql(SELECT ...)
stuff_mapped = stuff.map(myfunc)  ## I want myfunc to see mylist from 
above?

...

process_stuff.py
--

def myfunc(x):

metadata = broadcastVar.value # NameError: broadcastVar not found -- HOW TO FIX?

...


metadata.py


def get_metadata():

...

return mylist
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=d750a2b5-528a-47e7-8d0c-df37c6ff3370]ᐧ


On Wed, Apr 22, 2015 at 6:47 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Can you give full code? especially the myfunc?

On Wed, Apr 22, 2015 at 2:20 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Here's what I did:

print 'BROADCASTING...'
broadcastVar = sc.broadcast(mylist)
print broadcastVar
print broadcastVar.value
print 'FINISHED BROADCASTING...'

The above works fine,

but when I call myrdd.map(myfunc) I get NameError: global name 'broadcastVar' 
is not defined

The myfunc function is in a different module. How do I make it aware of 
broadcastVar?
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=cccea2c4-02b9-45f0-9e40-d25891e0ded5]ᐧ

On Wed, Apr 22, 2015 at 2:13 PM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Great. Will try to modify the code. Always room to optimize!
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=82843831-9ce6-4e1b-9fe8-72b9b7180fc4]ᐧ

On Wed, Apr 22, 2015 at 2:11 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Absolutely. The same code would work for local as well as distributed mode!

On Wed, Apr 22, 2015 at 11:08 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Can I use broadcast vars in local mode?
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=641ba5c3-4ac7-4614-84a9-45aafd24502f]ᐧ

On Wed, Apr 22, 2015 at 2:06 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Yep. Not efficient. Pretty bad actually. That's why broadcast variable were 
introduced right at the very beginning of Spark.



On Wed, Apr 22, 2015 at 10:58 AM, Vadim Bichutskiy 
vadim.bichuts...@gmail.commailto:vadim.bichuts...@gmail.com wrote:
Thanks TD. I was looking into broadcast variables.

Right now I am running it locally...and I plan to move it to production on 
EC2.

The way I fixed it is by doing myrdd.map(lambda x: (x, mylist)).map(myfunc) but 
I don't think it's efficient?

mylist is filled only once at the start and never changes.

Vadim
[https://mailfoogae.appspot.com/t?sender=admFkaW0uYmljaHV0c2tpeUBnbWFpbC5jb20%3Dtype=zerocontentguid=5aa8db9d-d2c8-49b1-821f-621a3d2aaf87]ᐧ

On Wed, Apr 22, 2015 at 1:42 PM, Tathagata Das 
t...@databricks.commailto:t...@databricks.com wrote:
Is the mylist present on every executor? If not, then you have to pass it on. 
And broadcasts are the best way to pass them on. But note that once broadcasted 
it will immutable at the executors, and if you update the list at the driver, 
you will have to broadcast it again.

TD


RE: spark with kafka

2015-04-18 Thread Ganelin, Ilya
Write Kafka stream to HDFS via Spark streaming then ingest files via Spark from 
HDFS.



Sent with Good (www.good.com)


-Original Message-
From: Shushant Arora 
[shushantaror...@gmail.commailto:shushantaror...@gmail.com]
Sent: Saturday, April 18, 2015 06:44 AM Eastern Standard Time
To: user
Subject: spark with kafka

Hi

I want to consume messages from kafka queue using spark batch program not spark 
streaming, Is there any way to achieve this, other than using low level(simple 
api) of kafka consumer.

Thanks


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: mapPartitions - How Does it Works

2015-03-18 Thread Ganelin, Ilya
Map partitions works as follows :
1) For each partition of your RDD, it provides an iterator over the values
within that partition
2) You then define a function that operates on that iterator

Thus if you do the following:

val parallel = sc.parallelize(1 to 10, 3)

parallel.mapPartitions( x = x.map(s = s + 1)).collect



You would get:
res3: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11)


In your example, x is not a pointer that traverses the iterator (e.g. With
.next()) , it¹s literally the Iterable object itself.
On 3/18/15, 10:19 AM, ashish.usoni ashish.us...@gmail.com wrote:

I am trying to understand about mapPartitions but i am still not sure how
it
works

in the below example it create three partition
val parallel = sc.parallelize(1 to 10, 3)

and when we do below
parallel.mapPartitions( x = List(x.next).iterator).collect

it prints value 
Array[Int] = Array(1, 4, 7)

Can some one please explain why it prints 1,4,7 only

Thanks,




--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/mapPartitions-How-Does
-it-Works-tp22123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Brodcast Variable updated from one transformation and used from another

2015-02-24 Thread Ganelin, Ilya
You're not using the broadcasted variable within your map operations. You're 
attempting to modify myObjrct directly which won't work because you are 
modifying the serialized copy on the executor. You want to do 
myObjectBroadcasted.value.insert and myObjectBroadcasted.value.lookup.



Sent with Good (www.good.com)


-Original Message-
From: Yiannis Gkoufas [johngou...@gmail.commailto:johngou...@gmail.com]
Sent: Tuesday, February 24, 2015 12:12 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Brodcast Variable updated from one transformation and used from another

Hi all,

I am trying to do the following.

val myObject = new MyObject();
val myObjectBroadcasted = sc.broadcast(myObject);

val rdd1 = sc.textFile(/file1).map(e =
{
 myObject.insert(e._1);
 (e._1,1)
});
rdd.cache.count(); //to make sure it is transformed.

val rdd2 = sc.textFile(/file2).map(e =
{
 val lookedUp = myObject.lookup(e._1);
 (e._1, lookedUp)
});

When I check the contents of myObject within the map of rdd1 everything seems 
ok.
On the other hand when I check the contents of myObject within the map of rdd2 
it seems to be empty.
I am doing something wrong?

Thanks a lot!


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Spark job fails on cluster but works fine on a single machine

2015-02-19 Thread Ganelin, Ilya
When writing to hdfs Spark will not overwrite existing files or directories. 
You must either manually delete these or use Java's Hadoop FileSystem class to 
remove them.



Sent with Good (www.good.com)


-Original Message-
From: Pavel Velikhov [pavel.velik...@gmail.commailto:pavel.velik...@gmail.com]
Sent: Thursday, February 19, 2015 11:32 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Spark job fails on cluster but works fine on a single machine

I have a simple Spark job that goes out to Cassandra, runs a pipe and stores 
results:

val sc = new SparkContext(conf)
val rdd = sc.cassandraTable(“keyspace, “table)
  .map(r = r.getInt(“column) + \t + 
write(get_lemmas(r.getString(tags
  .pipe(python3 /tmp/scripts_and_models/scripts/run.py)
  .map(r = convertStr(r) )
  .coalesce(1,true)
  .saveAsTextFile(/tmp/pavel/CassandraPipeTest.txt)
  //.saveToCassandra(“keyspace, “table, SomeColumns(“id”,data”))

When run on a single machine, everything is fine if I save to an hdfs file or 
save to Cassandra.
When run in cluster neither works:

 - When saving to file, I get an exception: User class threw exception: Output 
directory hdfs://hadoop01:54310/tmp/pavel/CassandraPipeTest.txt already exists
 - When saving to Cassandra, only 4 rows are updated with empty data (I test on 
a 4-machine Spark cluster)

Any hints on how to debug this and where the problem could be?

- I delete the hdfs file before running
- Would really like the output to hdfs to work, so I can debug
- Then it would be nice to save to Cassandra


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: RDD Partition number

2015-02-19 Thread Ganelin, Ilya
As Ted Yu points out, default block size is 128MB as of Hadoop 2.1.



Sent with Good (www.good.com)


-Original Message-
From: Ilya Ganelin [ilgan...@gmail.commailto:ilgan...@gmail.com]
Sent: Thursday, February 19, 2015 12:13 PM Eastern Standard Time
To: Alessandro Lulli; user@spark.apache.org
Cc: Massimiliano Bertolucci
Subject: Re: RDD Partition number

By default you will have (fileSize in Mb / 64) partitions. You can also set the 
number of partitions when you read in a file with sc.textFile as an optional 
second parameter.
On Thu, Feb 19, 2015 at 8:07 AM Alessandro Lulli 
lu...@di.unipi.itmailto:lu...@di.unipi.it wrote:
Hi All,

Could you please help me understanding how Spark defines the number of 
partitions of the RDDs if not specified?

I found the following in the documentation for file loaded from HDFS:
The textFile method also takes an optional second argument for controlling the 
number of partitions of the file. By default, Spark creates one partition for 
each block of the file (blocks being 64MB by default in HDFS), but you can also 
ask for a higher number of partitions by passing a larger value. Note that you 
cannot have fewer partitions than blocks

What is the rule for file loaded from the file systems?
For instance, i have a file X replicated on 4 machines. If i load the file X in 
a RDD how many partitions are defined and why?

Thanks for your help on this
Alessandro


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Is there a fast way to do fast top N product recommendations for all users

2015-02-12 Thread Ganelin, Ilya
Hi all - I've spent a while playing with this. Two significant sources of speed 
up that I've achieved are

1) Manually multiplying the feature vectors and caching either the user or 
product vector

2) By doing so, if one of the RDDs is a global it becomes possible to 
parallelize this step by running it in a thread and submitting multiple threads 
to yarn engine.

Doing so I've achieved an over 75x speed up compared with the packaged versio 
inside ml lib.



Sent with Good (www.good.com)


-Original Message-
From: Sean Owen [so...@cloudera.commailto:so...@cloudera.com]
Sent: Thursday, February 12, 2015 05:47 PM Eastern Standard Time
To: Crystal Xing
Cc: user@spark.apache.org
Subject: Re: Is there a fast way to do fast top N product recommendations for 
all users


Not now, but see https://issues.apache.org/jira/browse/SPARK-3066

As an aside, it's quite expensive to make recommendations for all
users. IMHO this is not something to do, if you can avoid it
architecturally. For example, consider precomputing recommendations
only for users whose probability of needing recommendations soon is
not very small. Usually, only a small number of users are active.

On Thu, Feb 12, 2015 at 10:26 PM, Crystal Xing crystalxin...@gmail.com wrote:
 Hi,


 I wonder if there is a way to do fast top N product recommendations for all
 users in training using mllib's ALS algorithm.

 I am currently calling

 public Rating[] recommendProducts(int user,
  int num)

 method in MatrixFactorizatoinModel for users one by one
 and it is quite slow since it does not operate on RDD input?

 I also tried to generate all possible
 user-product pairs and use
 public JavaRDDRating predict(JavaPairRDDInteger,Integer usersProducts)

 to fill out the matrix. Since I have a large number of user and products,

 the job stucks and transforming all pairs.


 I wonder if there is a better way to do this.

 Thanks,

 Crystal.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: spark challenge: zip with next???

2015-01-29 Thread Ganelin, Ilya
Make a copy of your RDD with an extra entry in the beginning to offset. The you 
can zip the two RDDs and run a map to generate an RDD of differences.



Sent with Good (www.good.com)


-Original Message-
From: derrickburns [derrickrbu...@gmail.commailto:derrickrbu...@gmail.com]
Sent: Thursday, January 29, 2015 02:52 PM Eastern Standard Time
To: user@spark.apache.org
Subject: spark challenge: zip with next???


Here is a spark challenge for you!

I have a data set where each entry has a date.  I would like to identify
gaps in the dates greater larger a given length.  For example, if the data
were log entries, then the gaps would tell me when I was missing log data
for long periods of time. What is the most efficient way to achieve this in
Spark?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-challenge-zip-with-next-tp21423.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: quickly counting the number of rows in a partition?

2015-01-13 Thread Ganelin, Ilya
Alternative to doing a naive toArray is to declare an accumulator per partition 
and use that. It's specifically what they were designed to do. See the 
programming guide.



Sent with Good (www.good.com)


-Original Message-
From: Tobias Pfeiffer [t...@preferred.jpmailto:t...@preferred.jp]
Sent: Tuesday, January 13, 2015 08:06 PM Eastern Standard Time
To: Kevin Burton
Cc: Ganelin, Ilya; user@spark.apache.org
Subject: Re: quickly counting the number of rows in a partition?

Hi,

On Mon, Jan 12, 2015 at 8:09 PM, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote:
Use the mapPartitions function. It returns an iterator to each partition. Then 
just get that length by converting to an array.

On Tue, Jan 13, 2015 at 2:50 PM, Kevin Burton 
bur...@spinn3r.commailto:bur...@spinn3r.com wrote:
Doesn’t that just read in all the values?  The count isn’t pre-computed? It’s 
not the end of the world if it’s not but would be faster.

Well, converting to an array may not work due to memory constraints, counting 
the items in the iterator may be better. However, there is no pre-computed 
value. For counting, you need to compute all values in the RDD, in general. If 
you think of

items.map(x = /* throw exception */).count()

then even though the count you want to get does not necessarily require the 
evaluation of the function in map() (i.e., the number is the same), you may not 
want to get the count if that code actually fails.

Tobias


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

2015-01-12 Thread Ganelin, Ilya
There are two related options:

To solve your problem directly try:

val conf = new SparkConf().set(spark.yarn.driver.memoryOverhead, 1024)
val sc = new SparkContext(conf)

And the second, which increases the overall memory available on the driver, as 
part of your spark-submit script add:

--driver-memory 2g


Hope this helps!


From: David McWhorter mcwhor...@ccri.commailto:mcwhor...@ccri.com
Date: Monday, January 12, 2015 at 11:01 AM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: configuring spark.yarn.driver.memoryOverhead on Spark 1.2.0

Hi all,

I'm trying to figure out how to set this option:  
spark.yarn.driver.memoryOverhead on Spark 1.2.0.  I found this helpful 
overview 
http://apache-spark-user-list.1001560.n3.nabble.com/Stable-spark-streaming-app-td14105.html#a14476,
 which suggests to launch with --spark.yarn.driver.memoryOverhead 1024 added to 
spark-submit.  However, when I do that I get this error:
Error: Unrecognized option '--spark.yarn.driver.memoryOverhead'.
Run with --help for usage help or --verbose for debug output
I have also tried calling sparkConf.set(spark.yarn.driver.memoryOverhead, 
1024) on my spark configuration object but I still get Will allocate AM 
container, with  MB memory including 384 MB overhead when launching.  I'm 
running in yarn-cluster mode.

Any help or tips would be appreciated.

Thanks,
David

--

David McWhorter
Software Engineer
Commonwealth Computer Research, Inc.
1422 Sachem Place, Unit #1
Charlottesville, VA 22901
mcwhor...@ccri.commailto:mcwhor...@ccri.com | 434.299.0090x204



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: MatrixFactorizationModel serialization

2015-01-07 Thread Ganelin, Ilya
Try loading features as

Val userfeatures = sc.objectFile(path1)
Val productFeatures = sc.objectFile(path2)

And then call the constructor of the MatrixFsgtorizationModel with those.



Sent with Good (www.good.com)


-Original Message-
From: wanbo [gewa...@163.commailto:gewa...@163.com]
Sent: Wednesday, January 07, 2015 10:54 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Re: MatrixFactorizationModel serialization


I save and reload model like this:

val bestModel = ALS.train(training, rank, numIter, lambda)
bestModel.get.userFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/userfeatures)
bestModel.get.productFeatures.saveAsObjectFile(hdfs://***:9000/spark/results/productfeatures)

val bestModel = obj.asInstanceOf[MatrixFactorizationModel]
bestModel.userFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/userfeatures)
bestModel.productFeatures.sparkContext.objectFile(hdfs://***:9000/spark/results/productfeatures)

But, there has same exception:

Exception in thread Driver java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:162)
Caused by: java.lang.NullPointerException
at
com.ft.jobs.test.ModelDeserialization$.main(ModelDeserialization.scala:138)
at 
com.ft.jobs.test.ModelDeserialization.main(ModelDeserialization.scala)
... 5 more


Have fixed this issue?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/MatrixFactorizationModel-serialization-tp18389p21024.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


HDFS_DELEGATION_TOKEN errors after switching Spark Contexts

2015-01-06 Thread Ganelin, Ilya
Hi all.
In order to get Spark to properly release memory during batch processing as a 
workaround to issue https://issues.apache.org/jira/browse/SPARK-4927 I tear 
down and re-initialize the spark context with :

context.stop() and
context = new SparkContext()

The problem I run into is that eventually I hit the below error:

:15/01/06 13:52:34 INFO BlockManagerMaster: Updated info of block 
broadcast_5_piece0
[1:53pm]:15/01/06 13:52:34 WARN Client: Exception encountered while connecting 
to the server : 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 214318 for zjb238) can't be found in cache
[1:53pm]:Exception in thread main 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken):
 token (HDFS_DELEGATION_TOKEN token 214318 for zjb238) can't be found in cache

This terminates execution but I have no idea why this would be happening. Does 
anyone know what could be at play here? This error appears as soon as I try to 
hit HDFS after re-starting a Spark context. When this issue appears is not 
deterministic and I am able to run several successful iterations before I see 
it.

Any help would be much appreciated. Thank you.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Long-running job cleanup

2014-12-31 Thread Ganelin, Ilya
The previously submitted code doesn’t actually show the problem I was trying to 
show effectively since the issue becomes clear between subsequent steps. Within 
a single step it appears things are cleared up properly.  Memory usage becomes 
evident pretty quickly.

def showMemoryUsage(sc: SparkContext) = {
  val usersPerStep = 2500
  val count = 100
  val numSteps = count / usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures: RDD[(Int, Int)] = sc.parallelize(1 to count).map(s = (s, 
2)).partitionBy(new HashPartitioner(200)).cache()
  val productFeatures: RDD[(Int, Int)] = sc.parallelize(1 to 100).map(s = 
(s, 4)).repartition(1).cache()

  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

val results = usersFiltered.map(user = {
  val userScore = userFeatures.lookup(user).head
  val recPerUser = Array(1,2,userScore)
  recPerUser
})

val mapedResults: Array[Int] = results.flatMap(scores = scores).toArray
log(State: Computed  + mapedResults.length +  predictions for stage  + 
i)

sc.parallelize(mapedResults)
// Write to disk (left out since problem is evident even without it)
  }
}

Example broadcast variable added:

14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0piece0 in memory on 
CLIENT_NODE:54640 (size: 794.0 B, free: 441.9 MB)

And then if I parse the entire log looking for “free : XXX.X MB” within a 
single step memory is cleared properly:

Free 441.1 MB
Free 439.8 MB
Free 439.8 MB
Free 441.1 MB
Free 441.1 MB
Free 439.8 MB

But between steps, the amount of available memory decreases (e.g. That range 
that things oscillate between shrinks) and over the course of many hours this 
eventually reduces to zero.

Free 440.7 MB
Free 438.7 MB
Free 438.7 MB
Free 440.7 MB

Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB
Free 425.0 MB
Free 425.0 MB
Free 435.4 MB

Free 426.7 MB
Free 402.5 MB
Free 402.5 MB
Free 426.7 MB
Free 426.7 MB
Free 402.5 MB
Free 402.5 MB
Free 426.7 MB
From: Ganelin, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com
Date: Tuesday, December 30, 2014 at 7:30 PM
To: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com, Patrick 
Wendell pwend...@gmail.commailto:pwend...@gmail.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Long-running job cleanup

Hi Patrick, to follow up on the below discussion, I am including a short code 
snippet that produces the problem on 1.1. This is kind of stupid code since 
it’s a greatly simplified version of what I’m actually doing but it has a 
number of the key components in place. I’m also including some example log 
output. Thank you.


def showMemoryUsage(sc : SparkContext) = {

  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()

  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)

  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

Example broadcast variable added:

14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
innovationclientnode01.cof.ds.capitalone.com:54640 (size: 794.0 B, free: 441.9 
MB)


And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

…

Free 441.7 MB

Free 441.7 MB

Free 441.7 MB

Free 441.7 MB

And so on.


Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.

From: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com
Date: Sunday, December 28, 2014 at 4:02 PM
To: Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com, Ganelin, 
Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Long-running job cleanup

Hi Patrick - is that cleanup present in 1.1?

The overhead I am talking about is with regards to what I believe is shuffle 
related metadata. If I watch the execution log I see small broadcast variables 
created for every stage of execution, a few KB at a time

Re: Long-running job cleanup

2014-12-30 Thread Ganelin, Ilya
Hi Patrick, to follow up on the below discussion, I am including a short code 
snippet that produces the problem on 1.1. This is kind of stupid code since 
it’s a greatly simplified version of what I’m actually doing but it has a 
number of the key components in place. I’m also including some example log 
output. Thank you.


def showMemoryUsage(sc : SparkContext) = {

  val usersPerStep = 25000
  val count = 100
  val numSteps = count/usersPerStep

  val users = sc.parallelize(1 to count)
  val zippedUsers = users.zipWithIndex().cache()
  val userFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 
count).map(s=(s,2)).cache()
  val productFeatures : RDD[(Int, Int)] = sc.parallelize(1 to 5000)
.map(s = (s, 4)).cache()

  for (i - 1 to numSteps) {
val usersFiltered = zippedUsers.filter(s = {
  ((i - 1) * usersPerStep = s._2)  (s._2  i * usersPerStep)
}).map(_._1).collect()

usersFiltered.foreach(user = {
  val mult = productFeatures.map(s = s._2 * userFeatures.lookup(user).head)
  mult.takeOrdered(20)

  // Normally this would then be written to disk
  // For the sake of the example this is all we're doing
})
  }
}

Example broadcast variable added:

14/12/30 19:25:19 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 
innovationclientnode01.cof.ds.capitalone.com:54640 (size: 794.0 B, free: 441.9 
MB)


And then if I parse the entire log looking for “free : XXX.X MB” I see the 
available memory slowly ticking away:

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

Free 441.8 MB

…

Free 441.7 MB

Free 441.7 MB

Free 441.7 MB

Free 441.7 MB

And so on.


Clearly the above code is not persisting the intermediate RDD (mult), yet 
memory is never being properly freed up.

From: Ilya Ganelin ilgan...@gmail.commailto:ilgan...@gmail.com
Date: Sunday, December 28, 2014 at 4:02 PM
To: Patrick Wendell pwend...@gmail.commailto:pwend...@gmail.com, Ganelin, 
Ilya ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com
Cc: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: Long-running job cleanup

Hi Patrick - is that cleanup present in 1.1?

The overhead I am talking about is with regards to what I believe is shuffle 
related metadata. If I watch the execution log I see small broadcast variables 
created for every stage of execution, a few KB at a time, and a certain number 
of MB remaining of available memory on the driver. As I run, this available 
memory goes down, and these variables are never erased. The only RDDs that 
persist are those that are explicitly cached. The RDDs that are generated 
iteratively are not retained or referenced, so I would expect things to get 
cleaned up but they do not. The items consuming memory are not RDDs but what 
appears to be shuffle metadata.

I have a script that parses the logs to show memory consumption over time and 
the script shows memory very steadily being consumed over many hours without 
clearing one small bit at a time.

The specific computation I am doing is the generation of dot products between 
two RDDs of vectors. I need to generate this product for every combination of 
products between the two RDDs but both RDDs are too big to fit in memory. 
Consequently, I iteratively generate this product across one entry from the 
first RDD and all entries from the second and retain the pared-down result 
within an accumulator (by retaining the top N results it is possible to 
actually store the Cartesian which is otherwise too large to fit on disk). 
After a certain number of iterations these intermediate results are then 
written to disk. Each of these steps is tractable in itself but due to the 
accumulation of memory, the overall job becomes intractable.

I would appreciate any suggestions as to how to clean up these intermediate 
broadcast variables. Thank you.


On Sun, Dec 28, 2014 at 1:56 PM Patrick Wendell 
pwend...@gmail.commailto:pwend...@gmail.com wrote:
What do you mean when you say the overhead of spark shuffles start to
accumulate? Could you elaborate more?

In newer versions of Spark shuffle data is cleaned up automatically
when an RDD goes out of scope. It is safe to remove shuffle data at
this point because the RDD can no longer be referenced. If you are
seeing a large build up of shuffle data, it's possible you are
retaining references to older RDDs inadvertently. Could you explain
what your job actually doing?

- Patrick

On Mon, Dec 22, 2014 at 2:36 PM, Ganelin, Ilya
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com wrote:
 Hi all, I have a long running job iterating over a huge dataset. Parts of
 this operation are cached. Since the job runs for so long, eventually the
 overhead of spark shuffles starts to accumulate culminating in the driver
 starting to swap.

 I am aware of the spark.cleanup.tll parameter that allows me to configure
 when cleanup happens but the issue

Long-running job cleanup

2014-12-22 Thread Ganelin, Ilya
Hi all, I have a long running job iterating over a huge dataset. Parts of this 
operation are cached. Since the job runs for so long, eventually the overhead 
of spark shuffles starts to accumulate culminating in the driver starting to 
swap.

I am aware of the spark.cleanup.tll parameter that allows me to configure when 
cleanup happens but the issue with doing this is that it isn’t done safely, 
e.g. I can be in the middle of processing a stage when this cleanup happens and 
my cached RDDs get cleared. This ultimately causes a KeyNotFoundException when 
I try to reference the now cleared cached RDD. This behavior doesn’t make much 
sense to me, I would expect the cached RDD to either get regenerated or at the 
very least for there to be an option to execute this cleanup without deleting 
those RDDs.

Is there a programmatically safe way of doing this cleanup that doesn’t break 
everything?

If I instead tear down the spark context and bring up a new context for every 
iteration (assuming that each iteration is sufficiently long-lived), would 
memory get released appropriately?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Understanding disk usage with Accumulators

2014-12-16 Thread Ganelin, Ilya
Hi all – I’m running a long running batch-processing job with Spark through 
Yarn. I am doing the following

Batch Process

val resultsArr = 
sc.accumulableCollection(mutable.ArrayBuffer[ListenableFuture[Result]]())

InMemoryArray.forEach{
1) Using a thread pool, generate callable jobs that operate on an RDD
1a) These callable jobs perform an operation combining that RDD and a 
broadcasted array and store the result of that computation as an Array (Result)
2) Store the results of this operation (upon resolution) in the 
accumulableCollection
}

sc.parallelize(resultsArr).saveAsObjectFile (about 1gb of data), happens a 
total of about 4 times during execution over the course of several hours.

My immediate problem is that during this execution two things happen.

Firstly, on my driver node I eventually run out of memory, and start swapping 
to disk (which causes slowdown). However, each Batch can be processed entirely 
within the available memory on the driver, so basically this memory is somehow 
not being released between runs (even though I leave the context of the 
function running the Batch process)

Secondly, during execution, things are being written to HDFS and I am running 
out of space on the local partitions on the node. Note, this is NOT the 
explicit saveAsObjectFile call that I am making, but appears to be something 
going on with Spark internally.


Can anyone speak to what is going on under the hood here and what I can do to 
resolve this?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Understanding disk usage with Accumulators

2014-12-16 Thread Ganelin, Ilya
Also, this may be related to this issue 
https://issues.apache.org/jira/browse/SPARK-3885.

Further, to clarify, data is being written to Hadoop on the data nodes.

Would really appreciate any help. Thanks!

From: Ganelin, Ganelin, Ilya 
ilya.gane...@capitalone.commailto:ilya.gane...@capitalone.com
Date: Tuesday, December 16, 2014 at 10:23 AM
To: 'user@spark.apache.orgmailto:'user@spark.apache.org' 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Understanding disk usage with Accumulators

Hi all – I’m running a long running batch-processing job with Spark through 
Yarn. I am doing the following

Batch Process

val resultsArr = 
sc.accumulableCollection(mutable.ArrayBuffer[ListenableFuture[Result]]())

InMemoryArray.forEach{
1) Using a thread pool, generate callable jobs that operate on an RDD
1a) These callable jobs perform an operation combining that RDD and a 
broadcasted array and store the result of that computation as an Array (Result)
2) Store the results of this operation (upon resolution) in the 
accumulableCollection
}

sc.parallelize(resultsArr).saveAsObjectFile (about 1gb of data), happens a 
total of about 4 times during execution over the course of several hours.

My immediate problem is that during this execution two things happen.

Firstly, on my driver node I eventually run out of memory, and start swapping 
to disk (which causes slowdown). However, each Batch can be processed entirely 
within the available memory on the driver, so basically this memory is somehow 
not being released between runs (even though I leave the context of the 
function running the Batch process)

Secondly, during execution, things are being written to HDFS and I am running 
out of space on the local partitions on the node. Note, this is NOT the 
explicit saveAsObjectFile call that I am making, but appears to be something 
going on with Spark internally.


Can anyone speak to what is going on under the hood here and what I can do to 
resolve this?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: MLLib in Production

2014-12-10 Thread Ganelin, Ilya
Hi all – I’ve been storing the model userFeatures and productFeatures vectors 
that are generated internally serialized on disk and importing them as a 
separate job.

From: Sonal Goyal sonalgoy...@gmail.commailto:sonalgoy...@gmail.com
Date: Wednesday, December 10, 2014 at 5:31 AM
To: Yanbo Liang yanboha...@gmail.commailto:yanboha...@gmail.com
Cc: Simon Chan simonc...@gmail.commailto:simonc...@gmail.com, Klausen 
Schaefersinho klaus.schaef...@gmail.commailto:klaus.schaef...@gmail.com, 
user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Re: MLLib in Production

You can also serialize the model and use it in other places.

Best Regards,
Sonal
Founder, Nube Technologieshttp://www.nubetech.co

http://in.linkedin.com/in/sonalgoyal



On Wed, Dec 10, 2014 at 5:32 PM, Yanbo Liang 
yanboha...@gmail.commailto:yanboha...@gmail.com wrote:
Hi Klaus,

There is no ideal method but some workaround.
Train model in Spark cluster or YARN cluster, then use RDD.saveAsTextFile to 
store this model which include weights and intercept to HDFS.
Load weights file and intercept file from HDFS, construct a GLM model, and then 
run model.predict() method, you can get what you want.

The Spark community also have some ongoing work about export model with PMML.

2014-12-10 18:32 GMT+08:00 Simon Chan 
simonc...@gmail.commailto:simonc...@gmail.com:
Hi Klaus,

PredictionIO is an open source product based on Spark MLlib for exactly this 
purpose.
This is the tutorial for classification in particular: 
http://docs.prediction.io/classification/quickstart/

You can add custom serving logics and retrieve prediction result through REST 
API/SDKs at other places.

Simon


On Wed, Dec 10, 2014 at 2:25 AM, Klausen Schaefersinho 
klaus.schaef...@gmail.commailto:klaus.schaef...@gmail.com wrote:
Hi,


I would like to use Spark to train a model, but use the model in some other 
place,, e.g. a servelt to do some classification in real time.

What is the best way to do this? Can I just copy I model file or something and 
load it in the servelt? Can anybody point me to a good tutorial?


Cheers,


Klaus



--
“Overfitting” is not about an excessive amount of physical exercise...





The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Spark executor lost

2014-12-03 Thread Ganelin, Ilya
You want to look further up the stack (there are almost certainly other errors 
before this happens) and those other errors may give your better idea of what 
is going on. Also if you are running on yarn you can run yarn logs 
-applicationId yourAppId to get the logs from the data nodes.



Sent with Good (www.good.com)


-Original Message-
From: S. Zhou [myx...@yahoo.com.INVALIDmailto:myx...@yahoo.com.INVALID]
Sent: Wednesday, December 03, 2014 06:30 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Spark executor lost

We are using Spark job server to submit spark jobs (our spark version is 0.91). 
After running the spark job server for a while, we often see the following 
errors (executor lost) in the spark job server log. As a consequence, the spark 
driver (allocated inside spark job server) gradually loses executors. And 
finally the spark job server no longer be able to submit jobs. We tried to 
google the solutions but so far no luck. Please help if you have any ideas. 
Thanks!

[2014-11-25 01:37:36,250] INFO  parkDeploySchedulerBackend [] 
[akka://JobServer/user/context-supervisor/next-staging] - Executor 6 
disconnected, so removing it
[2014-11-25 01:37:36,252] ERROR cheduler.TaskSchedulerImpl [] 
[akka://JobServer/user/context-supervisor/next-staging] - Lost executor 6 on 
: remote Akka client disassociated
[2014-11-25 01:37:36,252] INFO  ark.scheduler.DAGScheduler [] [] - Executor 
lost: 6 (epoch 8)
[2014-11-25 01:37:36,252] INFO  ge.BlockManagerMasterActor [] [] - Trying to 
remove executor 6 from BlockManagerMaster.
[2014-11-25 01:37:36,252] INFO  storage.BlockManagerMaster [] [] - Removed 6 
successfully in removeExecutor
[2014-11-25 01:37:36,286] INFO  ient.AppClient$ClientActor [] 
[akka://JobServer/user/context-supervisor/next-staging] - Executor updated: 
app-20141125002023-0037/6 is now FAILED (Command exited with code 143)




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


SaveAsTextFile brings down data nodes with IO Exceptions

2014-12-02 Thread Ganelin, Ilya
Hi all, as the last stage of execution, I am writing out a dataset to disk. 
Before I do this, I force the DAG to resolve so this is the only job left in 
the pipeline. The dataset in question is not especially large (a few 
gigabytes). During this step however, HDFS will inevitable crash. I will lose 
connection to data-nodes and get stuck in the loop of death – failure causes 
job restart, eventually causing the overall job to fail. On the data node logs 
I see the errors below. Does anyone have any ideas as to what is going on here? 
Thanks!


java.io.IOException: Premature EOF from inputStream
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at 
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:455)
at 
org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:741)
at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:718)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72)
at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225)
at java.lang.Thread.run(Thread.java:745)




innovationdatanode03.cof.ds.capitalone.com:1004:DataXceiver error processing 
WRITE_BLOCK operation  src: /10.37.248.60:44676 dst: /10.37.248.59:1004
java.net.SocketTimeoutException: 65000 millis timeout while waiting for channel 
to be ready for read. ch : java.nio.channels.SocketChannel[connected 
local=/10.37.248.59:43692 remote=/10.37.248.63:1004]
at 
org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
at 
org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at 
org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:2101)
at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:660)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:126)
at 
org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:72)
at 
org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:225)
at java.lang.Thread.run(Thread.java:745)




DataNode{data=FSDataset{dirpath='[/opt/cloudera/hadoop/1/dfs/dn/current, 
/opt/cloudera/hadoop/10/dfs/dn/current, /opt/cloudera/hadoop/2/dfs/dn/current, 
/opt/cloudera/hadoop/3/dfs/dn/current, /opt/cloudera/hadoop/4/dfs/dn/current, 
/opt/cloudera/hadoop/5/dfs/dn/current, /opt/cloudera/hadoop/6/dfs/dn/current, 
/opt/cloudera/hadoop/7/dfs/dn/current, /opt/cloudera/hadoop/8/dfs/dn/current, 
/opt/cloudera/hadoop/9/dfs/dn/current]'}, 
localName='innovationdatanode03.cof.ds.capitalone.com:1004', 
datanodeUuid='e8a11fe2-300f-4e78-9211-f2ee41af6b8c', 
xmitsInProgress=0}:Exception transfering block 
BP-1458718292-10.37.248.67-1398976716371:blk_1076854538_3118445 to mirror 
10.37.248.63:1004: java.net.SocketTimeoutException: 65000 millis timeout while 
waiting for channel to be ready for read. ch : 
java.nio.channels.SocketChannel[connected local=/10.37.248.59:43692 
remote=/10.37.248.63:1004]


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: ALS failure with size Integer.MAX_VALUE

2014-11-29 Thread Ganelin, Ilya
Hi Bharath – I’m unsure if this is your problem but the 
MatrixFactorizationModel in MLLIB which is the underlying component for ALS 
expects your User/Product fields to be integers. Specifically, the input to ALS 
is an RDD[Rating] and Rating is an (Int, Int, Double). I am wondering if 
perhaps one of your identifiers exceeds MAX_INT, could you write a quick check 
for that?

I have been running a very similar use case to yours (with more constrained 
hardware resources) and I haven’t seen this exact problem but I’m sure we’ve 
seen similar issues. Please let me know if you have other questions.

From: Bharath Ravi Kumar reachb...@gmail.commailto:reachb...@gmail.com
Date: Thursday, November 27, 2014 at 1:30 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: ALS failure with size  Integer.MAX_VALUE

We're training a recommender with ALS in mllib 1.1 against a dataset of 150M 
users and 4.5K items, with the total number of training records being 1.2 
Billion (~30GB data). The input data is spread across 1200 partitions on HDFS. 
For the training, rank=10, and we've configured {number of user data blocks = 
number of item data blocks}. The number of user/item blocks was varied  between 
50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each), there 
are atleast a couple of tasks that end up shuffle reading  9.7G each in the 
aggregate stage (ALS.scala:337) and failing with the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:745)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:108)
at org.apache.spark.storage.DiskStore.getValues(DiskStore.scala:124)
at 
org.apache.spark.storage.BlockManager.getLocalFromDisk(BlockManager.scala:332)
at 
org.apache.spark.storage.BlockFetcherIterator$BasicBlockFetcherIterator$$anonfun$getLocalBlocks$1.apply(BlockFetcherIterator.scala:204)



As for the data, on the user side, the degree of a node in the connectivity 
graph is relatively small. However, on the item side, 3.8K out of the 4.5K 
items are connected to 10^5 users each on an average, with 100 items being 
connected to nearly 10^8 users. The rest of the items are connected to less 
than 10^5 users. With such a skew in the connectivity graph, I'm unsure if 
additional memory or variation in the block sizes would help (considering my 
limited understanding of the implementation in mllib). Any suggestion to 
address the problem?


The test is being run on a standalone cluster of 3 hosts, each with 100G RAM  
24 cores dedicated to the application. The additional configs I made specific 
to the shuffle and task failure reduction are as follows:

spark.core.connection.ack.wait.timeout=600
spark.shuffle.consolidateFiles=true
spark.shuffle.manager=SORT


The job execution summary is as follows:

Active Stages:

Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 
failed), Shuffle Read :  141.6 GB

Completed Stages (5)
Stage IdDescriptionDuration
Tasks: Succeeded/TotalInputShuffle ReadShuffle Write
6org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min 
1200/120029.9 GB1668.4 MB186.8 GB

5mapPartitionsWithIndex at ALS.scala:250 +details

3map at ALS.scala:231

0aggregate at ALS.scala:337 +details

1map at ALS.scala:228 +details


Thanks,
Bharath


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Cancelled Key Exceptions on Massive Join

2014-11-14 Thread Ganelin, Ilya
Hello all. I have been running a Spark Job that eventually needs to do a large 
join.

24 million x 150 million

A broadcast join is infeasible in this instance clearly, so I am instead 
attempting to do it with Hash Partitioning by defining a custom partitioner as:

class RDD2Partitioner(partitions: Int) extends HashPartitioner(partitions) {

  override def getPartition(key: Any): Int = key match {
case k: Tuple2[Int, String] = super.getPartition(k._1)
case _ = super.getPartition(key)
  }

}

I then partition both arrays using this partitioner. However, the job 
eventually fails with the following exception which if I had to guess indicated 
that a network connection was interrupted during the shuffle stage, causing 
things to get lost and ultimately resulting in a  fetch failure:

14/11/14 12:56:21 INFO ConnectionManager: Removing ReceivingConnection to 
ConnectionManagerId(innovationdatanode08.cof.ds.capitalone.com,37590)
14/11/14 12:56:21 INFO ConnectionManager: Key not valid ? 
sun.nio.ch.SelectionKeyImpl@7369b398
14/11/14 12:56:21 INFO ConnectionManager: key already cancelled ? 
sun.nio.ch.SelectionKeyImpl@7369b398
java.nio.channels.CancelledKeyException
at 
org.apache.spark.network.ConnectionManager.run(ConnectionManager.scala:386)
at 
org.apache.spark.network.ConnectionManager$$anon$4.run(ConnectionManager.scala:139)


In the spark UI, I still see a substantial amount of shuffling going on at this 
stage, I am wondering if I’m perhaps using the partitioner incorrectly?


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Spark- How can I run MapReduce only on one partition in an RDD?

2014-11-13 Thread Ganelin, Ilya
Why do you only want the third partition? You can access individual partitions 
using the partitions() function. You can also filter your data using the 
filter() function to only contain the data you care about. Moreover, when you 
create your RDDs unless you define a custom partitioner you have no way of 
controlling what data is in partition #3. Therefore, there is almost no reason 
to want to operate on an individual partition.

-Original Message-
From: Tim Chou [timchou@gmail.commailto:timchou@gmail.com]
Sent: Thursday, November 13, 2014 06:01 PM Eastern Standard Time
To: user@spark.apache.org
Subject: Spark- How can I run MapReduce only on one partition in an RDD?

Hi All,

I use textFile to create a RDD. However, I don't want to handle the whole data 
in this RDD. For example, maybe I only want to solve the data in 3rd partition 
of the RDD.

How can I do it? Here are some possible solutions that I'm thinking:
1. Create multiple RDDs when reading the file
2.  Run MapReduce functions with the specific partition for an RDD.

However, I cannot find any appropriate function.

Thank you and wait for your suggestions.

Best,
Tim


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Fwd: Why is Spark not using all cores on a single machine?

2014-11-07 Thread Ganelin, Ilya
To set the number of spark cores used you must set two parameters in the actual 
spark-submit script. You must set num-executors (the number of nodes to have) 
and executor-cores (the number of cores per machinel) . Please see the Spark 
configuration and tuning pages for more details.


-Original Message-
From: ll [duy.huynh@gmail.commailto:duy.huynh@gmail.com]
Sent: Saturday, November 08, 2014 12:05 AM Eastern Standard Time
To: u...@spark.incubator.apache.org
Subject: Re: Fwd: Why is Spark not using all cores on a single machine?


hi.  i did use local[8] as below, but it still ran on only 1 core.

val sc = new SparkContext(new
SparkConf().setMaster(local[8]).setAppName(abc))

any advice is much appreciated.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Fwd-Why-is-Spark-not-using-all-cores-on-a-single-machine-tp1638p18397.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Redploying a spark streaming application

2014-11-06 Thread Ganelin, Ilya
You’ve basically got it.
Deployment step can be simply scp-ing the file to a known location on the 
server and then executing a run script on the server that actually runs the 
spark-submit.

From: Ashic Mahtab as...@live.commailto:as...@live.com
Date: Thursday, November 6, 2014 at 5:01 PM
To: user@spark.apache.orgmailto:user@spark.apache.org 
user@spark.apache.orgmailto:user@spark.apache.org
Subject: Redploying a spark streaming application

Hello,
I'm trying to find the best way of redeploying a spark streaming application. 
Ideally, I was thinking of a scenario where a build server packages up a jar 
and a deployment step submits it to a Spark Master. On the next successful 
build, the next version would get deployed taking down the previous version. 
What would be the best way of achieving this?

Thanks,
Ashic.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Key-Value decomposition

2014-11-03 Thread Ganelin, Ilya
Very straightforward:

You want to use cartesian.
If you have two RDDs - RDD_1(³A²) and RDD_2(1,2,3)

RDD_1.cartesian(RDD_2) will generate the cross product between the two
RDDs and you will have
RDD_3((³A²,1), (³B²,2), (³C², 3))


On 11/3/14, 11:38 AM, david david...@free.fr wrote:

Hi,

  I'm a newbie in Spark and faces the following use case :

   val data = Array ( A, 1;2;3)
   val rdd = sc.parallelize(data)

// Something here to produce RDD of (Key,value)
// ( A, 1) , (A, 2), (A, 3)
  
Does anybody know how to do ?

Thank's

   



--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Key-Value-decompositio
n-tp17966.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Repartitioning by partition size, not by number of partitions.

2014-10-31 Thread Ganelin, Ilya
Hi Jan. I've actually written a function recently to do precisely that using 
the RDD.randomSplit function. You just need to calculate how big each element 
of your data is, then how many of each data can fit in each RDD to populate the 
input to rqndomSplit. Unfortunately, in my case I wind up with GC errors on 
large data doing this and am still debugging :)

-Original Message-
From: jan.zi...@centrum.cz [jan.zi...@centrum.czmailto:jan.zi...@centrum.cz]
Sent: Friday, October 31, 2014 06:27 AM Eastern Standard Time
To: user@spark.apache.org
Subject: Repartitioning by partition size, not by number of partitions.


Hi,

I have inpot data that are many of very small files containing one .json.

For performance reasons (I use PySpark) I have to do repartioning, currently I 
do:

sc.textFile(files).coalesce(100))



Problem is that I have to guess the number of partitions in a such way that 
it's as fast as possible and I am still on the sefe side with the RAM memory. 
So this is quiet difficult.

For this reason I would like to ask if there is some way, how to replace 
coalesce(100) by something that creates N partitions of the given size? I went 
through the documentation, but I was not able to find some way, how to do that.

thank you in advance for any help or advice.




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: FileNotFoundException in appcache shuffle files

2014-10-29 Thread Ganelin, Ilya
Hi Ryan - I've been fighting the exact same issue for well over a month now. I 
initially saw the issue in 1.02 but it persists in 1.1.

Jerry - I believe you are correct that this happens during a pause on 
long-running jobs on a large data set. Are there any parameters that you 
suggest tuning to mitigate these situations?

Also, you ask if there are any other exceptions - for me this error has tended 
to follow an earlier exception, which supports the theory that it is a symptom 
of an earlier problem.

My understanding is as follows - during a shuffle step an executor fails and 
doesn't report its output - next, during the reduce step, that output can't be 
found where expected and rather than rerunning the failed execution, Spark goes 
down.

We can add my email thread to your reference list :
https://mail-archives.apache.org/mod_mbox/incubator-spark-user/201410.mbox/CAM-S9zS-+-MSXVcohWEhjiAEKaCccOKr_N5e0HPXcNgnxZd=h...@mail.gmail.com

-Original Message-
From: Shao, Saisai [saisai.s...@intel.commailto:saisai.s...@intel.com]
Sent: Wednesday, October 29, 2014 01:46 AM Eastern Standard Time
To: Ryan Williams
Cc: user
Subject: RE: FileNotFoundException in appcache shuffle files

Hi Ryan,

This is an issue from sort-based shuffle, not consolidated hash-based shuffle. 
I guess mostly this issue occurs when Spark cluster is in abnormal situation, 
maybe long time of GC pause or some others, you can check the system status or 
if there’s any other exceptions beside this one.

Thanks
Jerry

From: nobigdealst...@gmail.com [mailto:nobigdealst...@gmail.com] On Behalf Of 
Ryan Williams
Sent: Wednesday, October 29, 2014 1:31 PM
To: user
Subject: FileNotFoundException in appcache shuffle files

My job is failing with the following error:
14/10/29 02:59:14 WARN scheduler.TaskSetManager: Lost task 1543.0 in stage 3.0 
(TID 6266, 
demeter-csmau08-19.demeter.hpc.mssm.eduhttp://demeter-csmau08-19.demeter.hpc.mssm.edu):
 java.io.FileNotFoundException: 
/data/05/dfs/dn/yarn/nm/usercache/willir31/appcache/application_1413512480649_0108/spark-local-20141028214722-43f1/26/shuffle_0_312_0.index
 (No such file or directory)
java.io.FileOutputStream.open(Native Method)
java.io.FileOutputStream.init(FileOutputStream.java:221)

org.apache.spark.storage.DiskBlockObjectWriter.open(BlockObjectWriter.scala:123)

org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:192)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:733)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4$$anonfun$apply$2.apply(ExternalSorter.scala:732)
scala.collection.Iterator$class.foreach(Iterator.scala:727)

org.apache.spark.util.collection.ExternalSorter$IteratorForPartition.foreach(ExternalSorter.scala:790)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:732)

org.apache.spark.util.collection.ExternalSorter$$anonfun$writePartitionedFile$4.apply(ExternalSorter.scala:728)
scala.collection.Iterator$class.foreach(Iterator.scala:727)
scala.collection.AbstractIterator.foreach(Iterator.scala:1157)

org.apache.spark.util.collection.ExternalSorter.writePartitionedFile(ExternalSorter.scala:728)

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:70)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:56)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:181)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:744)

I get 4 of those on task 1543 before the job aborts. Interspersed in the 4 
task-1543 failures are a few instances of this failure on another task. Here is 
the entire App Master stdout 
dumphttps://www.dropbox.com/s/m8c4o73o0bh7kf8/adam.108?dl=0[1] (~2MB; stack 
traces towards the bottom, of course). I am running {Spark 1.1, Hadoop 2.3.0}.

Here's a summary of the RDD manipulations I've done up to the point of failure:

 *   val A = [read a file in 1419 shards]

 *   the file is 177GB compressed but ends up being ~5TB uncompressed / 
hydrated into scala objects (I think; see below for more discussion on this 
point).
 *   some relevant Spark options:

 *   spark.default.parallelism=2000
 *   --master yarn-client
 *   --executor-memory 50g
 *   --driver-memory 10g
 *   --num-executors 100
 *   --executor-cores 4

 *   A.repartition(3000)

 *   3000 was chosen in an attempt to mitigate shuffle-disk-spillage that 
previous job attempts with 1000 or 1419 shards were mired in

 

GC Issues with randomSplit on large dataset

2014-10-29 Thread Ganelin, Ilya
Hey all – not writing to necessarily get a fix but more to get an understanding 
of what’s going on internally here.

I wish to take a cross-product of two very large RDDs (using cartesian), the 
product of which is well in excess of what can be stored on disk . Clearly that 
is intractable, thus my solution is to do things in batches - essentially I can 
take the cross product of a small piece of the first data set with the entirety 
of the other. To do this, I calculate how many items can fit into 1 gig of 
memory. Next, I use RDD.random Split() to partition the first data set. The 
issue is that I am trying to partition an RDD of several million items into 
several million partitions. This throws the following error:

I would like to understand the internals of what’s going on here so that I can 
adjust my approach accordingly. Thanks in advance.


14/10/29 22:17:44 ERROR ActorSystemImpl: Uncaught fatal error from thread 
[sparkDriver-akka.actor.default-dispatcher-16] shutting down ActorSystem 
[sparkDriver]
java.lang.OutOfMemoryError: GC overhead limit exceeded
at com.google.protobuf_spark.ByteString.toByteArray(ByteString.java:213)
at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:24)
at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Exception in thread main java.lang.OutOfMemoryError: GC overhead limit 
exceeded
at java.util.Arrays.copyOfRange(Arrays.java:2694)
at java.lang.String.init(String.java:203)
at java.lang.String.substring(String.java:1913)
at java.lang.String.subSequence(String.java:1946)
at java.util.regex.Matcher.getSubSequence(Matcher.java:1245)
at java.util.regex.Matcher.group(Matcher.java:490)
at java.util.Formatter$FormatSpecifier.init(Formatter.java:2675)
at java.util.Formatter.parse(Formatter.java:2528)
at java.util.Formatter.format(Formatter.java:2469)
at java.util.Formatter.format(Formatter.java:2423)
at java.lang.String.format(String.java:2790)
at scala.collection.immutable.StringLike$class.format(StringLike.scala:266)
at scala.collection.immutable.StringOps.format(StringOps.scala:31)
at org.apache.spark.util.Utils$.getCallSite(Utils.scala:944)
at org.apache.spark.rdd.RDD.init(RDD.scala:1227)
at org.apache.spark.rdd.RDD.init(RDD.scala:83)
at 
org.apache.spark.rdd.PartitionwiseSampledRDD.init(PartitionwiseSampledRDD.scala:47)
at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:378)
at org.apache.spark.rdd.RDD$$anonfun$randomSplit$1.apply(RDD.scala:377)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.rdd.RDD.randomSplit(RDD.scala:379)




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Building Spark against Cloudera 5.2.0 - Failure

2014-10-28 Thread Ganelin, Ilya
Hello all, I am attempting to manually build the master branch of Spark against 
Cloudera’s 5.2.0 deployment. To do this I am running:

mvn -Pyarn -Dhadoop.version=2.5.0-cdh5.2.0 -DskipTests clean package

The build completes successfully and then I run:
mvn -Pyarn -Phadoop.version=2.5.0-cdh5.2.0 test

Both on the cluster and on the local deployment, however, this sequence fails.

Locally, I have better luck with the tests but there are still failures.

On the cluster I get some passed tests but also numerous failures.
Summary is below:
[INFO] Reactor Summary:
[INFO]
[INFO] Spark Project Parent POM .. SUCCESS [  2.481 s]
[INFO] Spark Project Core  FAILURE [11:02 min]
[INFO] Spark Project Bagel ... SKIPPED
[INFO] Spark Project GraphX .. SKIPPED
[INFO] Spark Project Streaming ... SKIPPED
[INFO] Spark Project ML Library .. SKIPPED
[INFO] Spark Project Tools ... SKIPPED
[INFO] Spark Project Catalyst  SKIPPED
[INFO] Spark Project SQL . SKIPPED
[INFO] Spark Project Hive  SKIPPED
[INFO] Spark Project REPL  SKIPPED
[INFO] Spark Project YARN Parent POM . SKIPPED
[INFO] Spark Project YARN Stable API . SKIPPED
[INFO] Spark Project Assembly  SKIPPED
[INFO] Spark Project External Twitter  SKIPPED
[INFO] Spark Project External Kafka .. SKIPPED
[INFO] Spark Project External Flume Sink . SKIPPED
[INFO] Spark Project External Flume .. SKIPPED
[INFO] Spark Project External ZeroMQ . SKIPPED
[INFO] Spark Project External MQTT ... SKIPPED
[INFO] Spark Project Examples  SKIPPED
[INFO] 
[INFO] BUILD FAILURE
[INFO] 
[INFO] Total time: 11:06 min
[INFO] Finished at: 2014-10-28T17:21:27-05:00
[INFO] Final Memory: 69M/301M
[INFO] 
[WARNING] The requested profile hadoop.version=2.5.0-cdh5.2.0 could not be 
activated because it does not exist.
[ERROR] Failed to execute goal org.scalatest:scalatest-maven-plugin:1.0:test 
(test) on project spark-core_2.10: There are test failures - [Help 1]


What I would like to know is whether spark tests have been successfully run 
against the Cloudera deployment? Please let me know, thank you all.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: Spark-submt job Killed

2014-10-28 Thread Ganelin, Ilya
Hi Ami - I suspect that your code is completing because you have nothing
to actually force resolution of your job. Spark executes lazily, so for
example, if you have a bunch of maps in sequence but nothing else, Spark
will not actually execute anything.

Try adding an RDD.count() on the last RDD that you generate with spark to
ensure that something is forcing execution at the end.

On 10/28/14, 5:32 PM, akhandeshi ami.khande...@gmail.com wrote:

I recently starting seeing this new problem where spark-submt is
terminated
by Killed message but no error message indicating what happened. I have
enable logging on in spark configuration.  has anyone seen this or know
how
to troubleshoot?




--
View this message in context:
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-submt-job-Killed
-tp17560.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Is it possible to call a transform + action inside an action?

2014-10-28 Thread Ganelin, Ilya
You cannot have nested RDD transformations in Scala Spark. The issue is that 
when the outer operation is distributed to the cluster and kicks off a new job 
(the inner query) the inner job no longer has the context for the outer job. 
The way around this is to either do a join on two RDDs or to store a 
serializable lookup structure (not an RDD) in memory and have that sent to the 
nodes during execution. You can even do this efficiently by defining a 
broadcast variable.

I apologize for not providing examples - am on my phone :)




-Original Message-
From: kpeng1 [kpe...@gmail.commailto:kpe...@gmail.com]
Sent: Tuesday, October 28, 2014 06:34 PM Eastern Standard Time
To: u...@spark.incubator.apache.org
Subject: Is it possible to call a transform + action inside an action?


I currently writing an application that uses spark streaming.  What I am
trying to do is basically read in a few files (I do this by using the spark
context textFile) and then process those files inside an action that I apply
to a streaming RDD.  Here is the main code below:

def main(args: Array[String]) {
  val sparkConf = new SparkConf().setAppName(EmailIngestion)
  val ssc = new StreamingContext(sparkConf, Seconds(1))
  val sc = new SparkContext(sparkConf)
  val badWords = sc.textFile(/filters/badwords.txt)
  val urlBlacklist = sc.textFile(/filters/source_url_blacklist.txt)
  val domainBlacklist = sc.textFile(/filters/domain_blacklist.txt)
  val emailBlacklist = sc.textFile(/filters/blacklist.txt)


  val lines = FlumeUtils.createStream(ssc, localhost, 4545,
StorageLevel.MEMORY_ONLY_SER_2)

  lines.foreachRDD(rdd = rdd.foreachPartition(json =
Processor.ProcessRecord(json, badWords, urlBlacklist, domainBlacklist,
emailBlacklist)))
  ssc.start()
  ssc.awaitTermination()
}

Here is the code for processing the files found inside the ProcessRecord
method:
val emailBlacklistCnt = emailBlacklist.filter(black =
black.contains(email)).count

It looks like this throws an exception.  Is it possible to do this?






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-it-possible-to-call-a-transform-action-inside-an-action-tp17568.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994

2014-10-28 Thread Ganelin, Ilya
Have you checked for any global variables in your scope? Remember that even if 
variables are not passed to the function they will be included as part of the 
context passed to the nodes. If you can't zen out what is breaking then try to 
simplify what you're doing. Set up a simple test call (like a map) with the 
same objects you're trying to serialize and see if those work.

-Original Message-
From: Steve Lewis [lordjoe2...@gmail.commailto:lordjoe2...@gmail.com]
Sent: Tuesday, October 28, 2014 10:46 PM Eastern Standard Time
To: user@spark.apache.org
Subject: com.esotericsoftware.kryo.KryoException: Encountered unregistered 
class ID: 13994

 A cluster I am running on keeps getting KryoException. Unlike the Java 
serializer the Kryo Exception gives no clue as to what class is giving the error
The application runs properly locally but no the cluster and I have my own 
custom KryoRegistrator and register sereral dozen classes - essentially 
everything I can find which implements Serializable
How to I find what the KryoSerializer issue is?
I would love to see a list of all classes Kryo serialized


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


RE: Use RDD like a Iterator

2014-10-28 Thread Ganelin, Ilya
Would Rdd.map() do what you need? It will apply a function to every element of 
the rdd and return a resulting RDD.

-Original Message-
From: Zhan Zhang [zzh...@hortonworks.commailto:zzh...@hortonworks.com]
Sent: Tuesday, October 28, 2014 11:23 PM Eastern Standard Time
To: Dai, Kevin
Cc: user@spark.apache.org
Subject: Re: Use RDD like a Iterator

I think it is already lazily computed, or do you mean something else? Following 
is the signature of compute in RDD

 def compute(split: Partition, context: TaskContext): Iterator[T]

Thanks.

Zhan Zhang

On Oct 28, 2014, at 8:15 PM, Dai, Kevin 
yun...@ebay.commailto:yun...@ebay.com wrote:

Hi, ALL

I have a RDD[T], can I use it like a iterator.
That means I can compute every element of this RDD lazily.

Best Regards,
Kevin.


CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader of 
this message is not the intended recipient, you are hereby notified that any 
printing, copying, dissemination, distribution, disclosure or forwarding of 
this communication is strictly prohibited. If you have received this 
communication in error, please contact the sender immediately and delete it 
from your system. Thank You.


The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The information transmitted herewith is 
intended only for use by the individual or entity to which it is addressed.  If 
the reader of this message is not the intended recipient, you are hereby 
notified that any review, retransmission, dissemination, distribution, copying 
or other use of, or taking of any action in reliance upon this information is 
strictly prohibited. If you have received this communication in error, please 
contact the sender and delete the material from your computer.


Re: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark 1.1.0

2014-10-27 Thread Ganelin, Ilya
Hi Xiangrui - I can certainly save the data before ALS - that would be a
great first step. Why would reducing the number of partitions help? I
would very much like to understand what¹s happening internally. Also, with
regards to Burak¹s earlier comment, here is the JIRA referencing this
problem. 

https://issues.apache.org/jira/browse/SPARK-3080


On 10/27/14, 6:12 PM, Xiangrui Meng men...@gmail.com wrote:

Could you save the data before ALS and try to reproduce the problem?
You might try reducing the number of partitions and not using Kryo
serialization, just to narrow down the issue. -Xiangrui

On Mon, Oct 27, 2014 at 1:29 PM, Ilya Ganelin ilgan...@gmail.com wrote:
 Hi Burak.

 I always see this error. I'm running the CDH 5.2 version of Spark
1.1.0. I
 load my data from HDFS. By the time it hits the recommender it had gone
 through many spark operations.

 On Oct 27, 2014 4:03 PM, Burak Yavuz bya...@stanford.edu wrote:

 Hi,

 I've come across this multiple times, but not in a consistent manner. I
 found it hard to reproduce. I have a jira for it: SPARK-3080

 Do you observe this error every single time? Where do you load your
data
 from? Which version of Spark are you running?
 Figuring out the similarities may help in pinpointing the bug.

 Thanks,
 Burak

 - Original Message -
 From: Ilya Ganelin ilgan...@gmail.com
 To: user user@spark.apache.org
 Sent: Monday, October 27, 2014 11:36:46 AM
 Subject: MLLib ALS ArrayIndexOutOfBoundsException with Scala Spark
1.1.0

 Hello all - I am attempting to run MLLib's ALS algorithm on a
substantial
 test vector - approx. 200 million records.

 I have resolved a few issues I've had with regards to garbage
collection,
 KryoSeralization, and memory usage.

 I have not been able to get around this issue I see below however:


  java.lang.
  ArrayIndexOutOfBoundsException: 6106
 
 
  
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mlli
b$recommendation$ALS$$updateBlock$1.apply$mcVI$sp(ALS.
  scala:543)
  
scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141)
  org.apache.spark.mllib.recommendation.ALS.org
  $apache$spark$mllib$recommendation$ALS$$updateBlock(ALS.scala:537)
 
 
  
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mlli
b$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:505)
 
 
  
org.apache.spark.mllib.recommendation.ALS$$anonfun$org$apache$spark$mlli
b$recommendation$ALS$$updateFeatures$2.apply(ALS.scala:504)
 
 
  
org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValu
esRDD.scala:31)
 
 
  
org.apache.spark.rdd.MappedValuesRDD$$anonfun$compute$1.apply(MappedValu
esRDD.scala:31)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
  scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 
 
  
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(Externa
lAppendOnlyMap.scala:144)
 
 
  
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.
scala:159)
 
 
  
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$5.apply(CoGroupedRDD.
scala:158)
 
 
  
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(Tra
versableLike.scala:772)
 
 
  
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.sca
la:59)
 
  scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
 
 
  
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scal
a:771)
 
  org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:158)
  
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
 
  
org.apache.spark.rdd.MappedValuesRDD.compute(MappedValuesRDD.scala:31)
  
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  org.apache.spark.rdd.RDD.iterator(RDD.scala:229)


 I do not have any negative indices or indices that exceed Int-Max.

 I have partitioned the input data into 300 partitions and my Spark
config
 is below:

 .set(spark.executor.memory, 14g)
   .set(spark.storage.memoryFraction, 0.8)
   .set(spark.serializer,
 org.apache.spark.serializer.KryoSerializer)
   .set(spark.kryo.registrator, MyRegistrator)
   .set(spark.core.connection.ack.wait.timeout,600)
   .set(spark.akka.frameSize,50)
   .set(spark.yarn.executor.memoryOverhead,1024)

 Does anyone have any suggestions as to why i'm seeing the above error
or
 how to get around it?
 It may be possible to upgrade to the latest version of Spark but the
 mechanism for doing so in our environment isn't obvious yet.

 -Ilya Ganelin



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates. The