Re: SparkSQL can not use SchemaRDD from Hive

2014-07-28 Thread Kevin Jung
Thanks for your fast replies.
I was wrong about HiveContext.

val hive = new org.apache.spark.sql.hive.HiveContext(sc) 
var sample = hive.hql("select * from sample10")
var countHive = sample.count()
hive.registerRDDAsTable(sample,"temp") 
hive.sql("select * from temp").count()

It works so fine.

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841p10847.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Joining spark user group

2014-07-28 Thread Akhil Das
Welcome aboard!

Thanks
Best Regards


On Tue, Jul 29, 2014 at 11:46 AM, jitendra shelar <
jitendra.shelar...@gmail.com> wrote:

>
>


Re: SparkSQL can not use SchemaRDD from Hive

2014-07-28 Thread Zongheng Yang
As Hao already mentioned, using 'hive' (the HiveContext) throughout would
work.

On Monday, July 28, 2014, Cheng, Hao  wrote:

> In your code snippet, "sample" is actually a SchemaRDD, and SchemaRDD
> actually binds a certain SQLContext in runtime, I don't think we can
> manipulate/share the SchemaRDD across SQLContext Instances.
>
> -Original Message-
> From: Kevin Jung [mailto:itsjb.j...@samsung.com ]
> Sent: Tuesday, July 29, 2014 1:47 PM
> To: u...@spark.incubator.apache.org 
> Subject: SparkSQL can not use SchemaRDD from Hive
>
> Hi
> I got a error message while using Hive and SparkSQL.
> This is code snippet I used.
>
> (in spark-shell , 1.0.0)
> val sqlContext = new org.apache.spark.sql.SQLContext(sc)
> import sqlContext._
> val hive = new org.apache.spark.sql.hive.HiveContext(sc)
> var sample = hive.hql("select * from sample10") // This creates SchemaRDD.
> I have table 'sample10' in hive.
> var countHive = sample.count() // It works
> sqlContext.registerRDDAsTable(sample,"temp")
> sqlContext.sql("select * from temp").count() // It gives me a error message
> "java.lang.RuntimeException: Table Not Found: sample10"
>
> I don't know why this happen. Does SparkSQL conflict with Hive?
>
> Thanks,
> Kevin
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Joining spark user group

2014-07-28 Thread jitendra shelar




RE: HiveContext is creating metastore warehouse locally instead of in hdfs

2014-07-28 Thread Cheng, Hao
I ran this before, actually the hive-site.xml works in this way for me (the 
tricky happens in the new HiveConf(classOf[SessionState]), can you double check 
if hive-site.xml can be loaded in the class path? It supposes to appear in the 
root of the class path.

-Original Message-
From: nikroy16 [mailto:nikro...@gmail.com] 
Sent: Tuesday, July 29, 2014 12:51 PM
To: u...@spark.incubator.apache.org
Subject: HiveContext is creating metastore warehouse locally instead of in hdfs

Hi,

Even though hive.metastore.warehouse.dir in hive-site.xml is set to the default 
user/hive/warehouse and the permissions are correct in hdfs, HiveContext seems 
to be creating metastore locally instead of hdfs. After looking into the spark 
code, I found the following in HiveContext.scala:

   /**
* SQLConf and HiveConf contracts: when the hive session is first initialized, 
params in


* HiveConf will get picked up by the SQLConf. Additionally, any properties set 
by


* set() or a SET command inside hql() or sql() will be set in the SQLConf *as 
well as*


* in the HiveConf.
*/
  @transient protected[hive] lazy val hiveconf = new
HiveConf(classOf[SessionState])


  @transient protected[hive] lazy val sessionState = {


val ss = new SessionState(hiveconf)


set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set of 
HiveConf.


ss
  }


It seems as though when a HiveContext is created, it is launched without any 
configuration and hive-site.xml is not used to set properties. It looks like I 
can set properties after creation by using hql() method but what I am looking 
for is for the hive context to be initialized according to the configuration in 
hive-site.xml at the time of initialization. Any help would be greatly 
appreciated!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: SparkSQL can not use SchemaRDD from Hive

2014-07-28 Thread Cheng, Hao
In your code snippet, "sample" is actually a SchemaRDD, and SchemaRDD actually 
binds a certain SQLContext in runtime, I don't think we can manipulate/share 
the SchemaRDD across SQLContext Instances.

-Original Message-
From: Kevin Jung [mailto:itsjb.j...@samsung.com] 
Sent: Tuesday, July 29, 2014 1:47 PM
To: u...@spark.incubator.apache.org
Subject: SparkSQL can not use SchemaRDD from Hive

Hi
I got a error message while using Hive and SparkSQL.
This is code snippet I used.

(in spark-shell , 1.0.0)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val hive = new org.apache.spark.sql.hive.HiveContext(sc)
var sample = hive.hql("select * from sample10") // This creates SchemaRDD. I 
have table 'sample10' in hive.
var countHive = sample.count() // It works
sqlContext.registerRDDAsTable(sample,"temp")
sqlContext.sql("select * from temp").count() // It gives me a error message
"java.lang.RuntimeException: Table Not Found: sample10"

I don't know why this happen. Does SparkSQL conflict with Hive?

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


SparkSQL can not use SchemaRDD from Hive

2014-07-28 Thread Kevin Jung
Hi
I got a error message while using Hive and SparkSQL.
This is code snippet I used.

(in spark-shell , 1.0.0)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
val hive = new org.apache.spark.sql.hive.HiveContext(sc)
var sample = hive.hql("select * from sample10") // This creates SchemaRDD. I
have table 'sample10' in hive.
var countHive = sample.count() // It works
sqlContext.registerRDDAsTable(sample,"temp")
sqlContext.sql("select * from temp").count() // It gives me a error message
"java.lang.RuntimeException: Table Not Found: sample10"

I don't know why this happen. Does SparkSQL conflict with Hive?

Thanks,
Kevin



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/SparkSQL-can-not-use-SchemaRDD-from-Hive-tp10841.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Joining spark user group

2014-07-28 Thread jitendra shelar



Re: Fraud management system implementation

2014-07-28 Thread jitendra.shelar410
Thanks Sandy.

Shall I prefer Java or Scala for implementing the fraud management
system using spark?
If I use Java, will there be any performance degradation issues?

Regards,
Jitendra



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Re-Fraud-management-system-implementation-tp10787p10839.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


HiveContext is creating metastore warehouse locally instead of in hdfs

2014-07-28 Thread nikroy16
Hi,

Even though hive.metastore.warehouse.dir in hive-site.xml is set to the
default user/hive/warehouse and the permissions are correct in hdfs,
HiveContext seems to be creating metastore locally instead of hdfs. After
looking into the spark code, I found the following in HiveContext.scala:

   /**
* SQLConf and HiveConf contracts: when the hive session is first
initialized, params in


* HiveConf will get picked up by the SQLConf. Additionally, any properties
set by


* set() or a SET command inside hql() or sql() will be set in the SQLConf
*as well as*


* in the HiveConf.
*/
  @transient protected[hive] lazy val hiveconf = new
HiveConf(classOf[SessionState])


  @transient protected[hive] lazy val sessionState = {


val ss = new SessionState(hiveconf)


set(hiveconf.getAllProperties) // Have SQLConf pick up the initial set
of HiveConf.


ss
  }


It seems as though when a HiveContext is created, it is launched without any
configuration and hive-site.xml is not used to set properties. It looks like
I can set properties after creation by using hql() method but what I am
looking for is for the hive context to be initialized according to the
configuration in hive-site.xml at the time of initialization. Any help would
be greatly appreciated!





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/HiveContext-is-creating-metastore-warehouse-locally-instead-of-in-hdfs-tp10838.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Reading hdf5 formats with pyspark

2014-07-28 Thread Xiangrui Meng
That looks good to me since there is no Hadoop InputFormat for HDF5.
But remember to specify the number of partitions in sc.parallelize to
use all the nodes. You can change `process` to `read` which yields
records one-by-one. Then sc.parallelize(files,
numPartitions).flatMap(read) returns an RDD of records and you can use
it as the start of your pipeline. -Xiangrui

On Mon, Jul 28, 2014 at 9:05 PM, Mohit Singh  wrote:
> Hi,
>We have setup spark on a HPC system and are trying to implement some data
> pipeline and algorithms in place.
> The input data is in hdf5 (these are very high resolution brain images) and
> it can be read via h5py library in python. So, my current approach (which
> seems to be working ) is writing a function
> def process(filename):
>#logic
>
> and then execute via
> files = [list of filenames]
> sc.parallelize(files).foreach(process)
>
> Is this the right approach??
> --
> Mohit
>
> "When you want success as badly as you want the air, then you will get it.
> There is no other secret of success."
> -Socrates


Re: evaluating classification accuracy

2014-07-28 Thread Xiangrui Meng
Are you using 1.0.0? There was a bug, which was fixed in 1.0.1 and
master. If you don't want to switch to 1.0.1 or master, try to cache
and count test first. -Xiangrui

On Mon, Jul 28, 2014 at 6:07 PM, SK  wrote:
> Hi,
>
> In order to evaluate the ML classification accuracy, I am zipping up the
> prediction and test labels as follows and then comparing the pairs in
> predictionAndLabel:
>
> val prediction = model.predict(test.map(_.features))
> val predictionAndLabel = prediction.zip(test.map(_.label))
>
>
> However, I am finding that predictionAndLabel.count() has fewer elements
> than test.count().  For example, my test vector has 43 elements, but
> predictionAndLabel has only 38 pairs. I have tried other samples and always
> get fewer elements after zipping.
>
> Does zipping the two vectors cause any compression? or is this because of
> the distributed nature of the algorithm (I am running it in local mode on a
> single machine). In order to get the correct accuracy, I need the above
> comparison to be done by a single node on the entire test data (my data is
> quite small). How can I ensure that?
>
> thanks
>
>
>
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans: expensiveness of large vectors

2014-07-28 Thread Xiangrui Meng
Great! Thanks for testing the new features! -Xiangrui

On Mon, Jul 28, 2014 at 8:58 PM, durin  wrote:
> Hi Xiangrui,
>
> using the current master meant a huge improvement for my task. Something
> that did not even finish before (training with 120G of dense data) now
> completes in a reasonable time. I guess using torrent helps a lot in this
> case.
>
>
> Best regards,
> Simon
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10833.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Reading hdf5 formats with pyspark

2014-07-28 Thread Mohit Singh
Hi,
   We have setup spark on a HPC system and are trying to implement some
data pipeline and algorithms in place.
The input data is in hdf5 (these are very high resolution brain images) and
it can be read via h5py library in python. So, my current approach (which
seems to be working ) is writing a function
def process(filename):
   #logic

and then execute via
files = [list of filenames]
sc.parallelize(files).foreach(process)

Is this the right approach??
-- 
Mohit

"When you want success as badly as you want the air, then you will get it.
There is no other secret of success."
-Socrates


Re: KMeans: expensiveness of large vectors

2014-07-28 Thread durin
Hi Xiangrui,

using the current master meant a huge improvement for my task. Something
that did not even finish before (training with 120G of dense data) now
completes in a reasonable time. I guess using torrent helps a lot in this
case.


Best regards,
Simon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10833.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode

2014-07-28 Thread Jianshi Huang
I see Andrew, thanks for the explanantion.

On Tue, Jul 29, 2014 at 5:29 AM, Andrew Lee  wrote:

>
> I was thinking maybe we can suggest the community to enhance the Spark
> HistoryServer to capture the last failure exception from the container logs
> in the last failed stage?
>

This would be helpful. I personally like Yarn-Client mode as all the
running status can be checked directly from the console.


-- 
Jianshi Huang

LinkedIn: jianshi
Twitter: @jshuang
Github & Blog: http://huangjs.github.com/


Re: The function of ClosureCleaner.clean

2014-07-28 Thread Mayur Rustagi
I am not sure specifically about specific purpose of this function but
Spark needs to remove elements from the closure that may be included by
default but not really needed so as to serialize it & send it to executors
to operate on RDD. For example a function in Map function of RDD  may
reference objects inside the class, so you may want to send across those
objects but not the whole parent class.


Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi 



On Mon, Jul 28, 2014 at 8:28 PM, Wang, Jensen  wrote:

>  Hi, All
>
>   Before sc.runJob invokes dagScheduler.runJob, the func
> performed on the rdd is “cleaned” by ClosureCleaner.clearn.
>
>  Why  spark has to do this? What’s the purpose?
>


The function of ClosureCleaner.clean

2014-07-28 Thread Wang, Jensen
Hi, All
  Before sc.runJob invokes dagScheduler.runJob, the func performed 
on the rdd is "cleaned" by ClosureCleaner.clearn.
 Why  spark has to do this? What's the purpose?


Re: ssh connection refused

2014-07-28 Thread Google
This may occurred while the ec2 instance are not ready and ssh port not open 
yet.

Please give larger time by specify -w 300.  Default should be 120

Thanks,
Tracy
Sent from my iPhone

> On 2014年7月29日, at 上午8:17, sparking  wrote:
> 
> I'm trying to launch Spark with this command on AWS:
> *./spark-ec2 -k keypair_name -i keypair.pem -s 5 -t c1.xlarge -r us-west-2
> --hadoop-major-version=2.4.0 launch spark_cluster*
> 
> This script is erroring out with this message:
> *ssh: connect to host  port 22: Connection refused
> Error executing remote command, retrying after 30 seconds*: Command '['ssh',
> '-o', 'StrictHostKeyChecking=no', '-i', 'keypair.pem', '-t', '-t',
> u'root@', "\n  [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q
> -t rsa -N '' -f ~/.ssh/id_rsa &&\n cat ~/.ssh/id_rsa.pub >>
> ~/.ssh/authorized_keys)\n"]' returned non-zero exit status 255
> 
> Strange this is, I can manually ssh to master node as "root" using this
> command:
> *ssh root@ -i keypair.pem*
> 
> Does anyone know what is going on here? Any help is appreciated.
> 
> 
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/ssh-connection-refused-tp10818.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: [Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-28 Thread Earthson
"spark.MapOutputTrackerMasterActor: Asked to send map output locations for
shuffle 0 to" takes too much time, what should I do? What is the correct
configuration?

blockManager timeout if I using a small number of reduce partition.


 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765p10825.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: streaming sequence files?

2014-07-28 Thread Barnaby Falls
Running as Standalone Cluster. From my monitoring console:
[spark-logo-77x50px-hd.png] Spark Master at spark://101.73.54.149:7077
 * URL: spark://101.73.54.149:7077 * Workers: 1 * Cores: 2 Total, 0 
Used * Memory: 2.4 GB Total, 0.0 B Used * Applications: 0 Running, 24 
Completed * Drivers: 0 Running, 0 Completed * Status: ALIVE
Workers
   Id   Address   State
   Cores Memory   worker-20140723222518-101.73.54.149-37995 101.73.54.149:37995 
ALIVE 2 (0 Used) 2.4 GB (0.0 B Used)

> From: tathagata.das1...@gmail.com
> Date: Sat, 26 Jul 2014 20:14:37 -0700
> Subject: Re: streaming sequence files?
> To: user@spark.apache.org
> CC: u...@spark.incubator.apache.org
> 
> Which deployment environment are you running the streaming programs?
> Standalone? In that case you have to specify what is the max cores for
> each application, other all the cluster resources may get consumed by
> the application.
> http://spark.apache.org/docs/latest/spark-standalone.html
> 
> TD
> 
> On Thu, Jul 24, 2014 at 4:57 PM, Barnaby  wrote:
> > I have the streaming program writing sequence files. I can find one of the
> > files and load it in the shell using:
> >
> > scala> val rdd = sc.sequenceFile[String,
> > Int]("tachyon://localhost:19998/files/WordCounts/20140724-213930")
> > 14/07/24 21:47:50 INFO storage.MemoryStore: ensureFreeSpace(32856) called
> > with curMem=0, maxMem=309225062
> > 14/07/24 21:47:50 INFO storage.MemoryStore: Block broadcast_0 stored as
> > values to memory (estimated size 32.1 KB, free 294.9 MB)
> > rdd: org.apache.spark.rdd.RDD[(String, Int)] = MappedRDD[1] at sequenceFile
> > at :12
> >
> > So I got some type information, seems good.
> >
> > It took a while to research but I got the following streaming code to
> > compile and run:
> >
> > val wordCounts = ssc.fileStream[String, Int, SequenceFileInputFormat[String,
> > Int]](args(0))
> >
> > It works now and I offer this for reference to anybody else who may be
> > curious about saving sequence files and then streaming them back in.
> >
> > Question:
> > When running both streaming programs at the same time using spark-submit I
> > noticed that only one app would really run. To get the one app to continue I
> > had to stop the other app. Is there a way to get these running
> > simultaneously?
> >
> >
> >
> > --
> > View this message in context: 
> > http://apache-spark-user-list.1001560.n3.nabble.com/streaming-sequence-files-tp10557p10620.html
> > Sent from the Apache Spark User List mailing list archive at Nabble.com.
  

hdfs.BlockMissingException on Iterator.hasNext() in mapPartitionsWithIndex()

2014-07-28 Thread innowireless TaeYun Kim
Hi,

 

I'm trying to split one large multi-field text file into many single-field
text files.

My code is like this: (somewhat simplified)

 

final Broadcast bcSchema = sc.broadcast(schema);

final String outputPathName = env.outputPathName;

 

sc.textFile(env.inputFileName)

.mapPartitionsWithIndex(new Function2,
Iterator>()

{

@Override

public Iterator call(Integer partitionIndex,
Iterator itor) throws Exception

{

ColSchema schema = bcSchema.value();

FileSystem outputFs = FileSystem.get(new URI(outputPathName),
new Configuration());

PrintStream[] outss = new PrintStream[schema.getColCount()];

try

{

while (itor.hasNext())

{

String cols[] = itor.next().split("\t", -1);

 

for (int i = 0; i < schema.getColCount(); i++)

{

String value = cols[i];

if (value.isEmpty())

continue;



if (outss[i] == null)

outss[i] = new PrintStream(

outputFs.create(new Path(outputPathName +
"/" + schema.getColName(i) +

".tsv/part-" + String.format("%05d",
partitionIndex;



outss[i].println(value);

}

}

}

finally

{

for (PrintStream outs : outss)

if (outs != null)

outs.close();

}

return new ArrayList().iterator();  // dummy.

}

}, true)

.count();  // just to invoke mapPartitionsWithIndex().

 

bcSchema.unpersist();

 

 

Basically, it uses mapPartitionsWithIndex() to write multiple single-field
file at once partition by partition.

Eventually the job succeeds.

But occasionally while executing, the following exception is thrown and the
task fails (the task is automatically retried by Spark and then succeeds).

The location is itor.hasNext().

 

 

14/07/28 19:10:47 WARN TaskSetManager: Lost TID 154 (task 10.0:142)

14/07/28 19:10:47 WARN TaskSetManager: Loss was due to
org.apache.hadoop.hdfs.BlockMissingException

org.apache.hadoop.hdfs.BlockMissingException: Could not obtain block:
BP-1127695701-10.254.0.11-1405426572227:blk_1073930972_190153
file=/user/test/Data/big.tsv/part-00142

   at
org.apache.hadoop.hdfs.DFSInputStream.chooseDataNode(DFSInputStream.java:880
)

   at
org.apache.hadoop.hdfs.DFSInputStream.blockSeekTo(DFSInputStream.java:560)

   at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:7
90)

   at
org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:837)

   at java.io.DataInputStream.read(DataInputStream.java:100)

   at org.apache.hadoop.util.LineReader.fillBuffer(LineReader.java:180)

   at
org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:216)

   at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)

   at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:209)

   at
org.apache.hadoop.mapred.LineRecordReader.next(LineRecordReader.java:47)

   at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:198)

   at
org.apache.spark.rdd.HadoopRDD$$anon$1.getNext(HadoopRDD.scala:181)

   at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:71)

   at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:3
9)

   at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

   at
scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)

   at com.test.tester.Splitter$17.call(Splitter.java:504)

   at com.test.tester.Splitter$17.call(Splitter.java:495)

   at
org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction2$1.apply(Java
PairRDD.scala:744)

   at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.appl
y(JavaRDDLike.scala:81)

   at
org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.appl
y(JavaRDDLike.scala:81)

   at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:569)

   at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:569)

   at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)

   at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)

   at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)

   at
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)

   at org.apache.spark.scheduler.Task.run(Task.scala:51)

   at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187)

   at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:11
45)

   at
j

evaluating classification accuracy

2014-07-28 Thread SK
Hi,

In order to evaluate the ML classification accuracy, I am zipping up the
prediction and test labels as follows and then comparing the pairs in
predictionAndLabel:

val prediction = model.predict(test.map(_.features))
val predictionAndLabel = prediction.zip(test.map(_.label))


However, I am finding that predictionAndLabel.count() has fewer elements
than test.count().  For example, my test vector has 43 elements, but
predictionAndLabel has only 38 pairs. I have tried other samples and always
get fewer elements after zipping. 

Does zipping the two vectors cause any compression? or is this because of
the distributed nature of the algorithm (I am running it in local mode on a
single machine). In order to get the correct accuracy, I need the above
comparison to be done by a single node on the entire test data (my data is
quite small). How can I ensure that?

thanks 






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/evaluating-classification-accuracy-tp10822.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: how to publish spark inhouse?

2014-07-28 Thread Patrick Wendell
All of the scripts we use to publish Spark releases are in the Spark
repo itself, so you could follow these as a guideline. The publishing
process in Maven is similar to in SBT:

https://github.com/apache/spark/blob/master/dev/create-release/create-release.sh#L65

On Mon, Jul 28, 2014 at 12:39 PM, Koert Kuipers  wrote:
> ah ok thanks. guess i am gonna read up about maven-release-plugin then!
>
>
> On Mon, Jul 28, 2014 at 3:37 PM, Sean Owen  wrote:
>>
>> This is not something you edit yourself. The Maven release plugin
>> manages setting all this. I think virtually everything you're worried
>> about is done for you by this plugin.
>>
>> Maven requires artifacts to set a version and it can't inherit one. I
>> feel like I understood the reason this is necessary at one point.
>>
>> On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers  wrote:
>> > and if i want to change the version, it seems i have to change it in all
>> > 23
>> > pom files? mhhh. is it mandatory for these sub-project pom files to
>> > repeat
>> > that version info? useful?
>> >
>> > spark$ grep 1.1.0-SNAPSHOT * -r  | wc -l
>> > 23
>> >
>> >
>> >
>> > On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers 
>> > wrote:
>> >>
>> >> hey we used to publish spark inhouse by simply overriding the publishTo
>> >> setting. but now that we are integrated in SBT with maven i cannot find
>> >> it
>> >> anymore.
>> >>
>> >> i tried looking into the pom file, but after reading 1144 lines of xml
>> >> i
>> >> 1) havent found anything that looks like publishing
>> >> 2) i feel somewhat sick too
>> >> 3) i am considering alternative careers to developing...
>> >>
>> >> where am i supposed to look?
>> >> thanks for your help!
>> >
>> >
>
>


How true is this about spark streaming?

2014-07-28 Thread Rohit Pujari
Hello folks:

I came across a thread that said

* "A Spark RDD read/write access is driven by a context object and is
single threaded.  You cannot stream into Spark and read from the stream at
the same time.  You have to stop the stream processing, snapshot the RDD
and continue"*

Can you please offer some insights?


Thanks,
Rohit Pujari
Solutions Engineer, Hortonworks
rpuj...@hortonworks.com
716-430-6899

-- 
CONFIDENTIALITY NOTICE
NOTICE: This message is intended for the use of the individual or entity to 
which it is addressed and may contain information that is confidential, 
privileged and exempt from disclosure under applicable law. If the reader 
of this message is not the intended recipient, you are hereby notified that 
any printing, copying, dissemination, distribution, disclosure or 
forwarding of this communication is strictly prohibited. If you have 
received this communication in error, please contact the sender immediately 
and delete it from your system. Thank You.


Posterior probability in PySpark (MLLib) models

2014-07-28 Thread Vedant Dhandhania
Hi,

I was wondering if there a function to get the posterior probability of a
data point belonging to a specific class instead of the class labels in a
binary classification problem?

I tried reading through the API docs and could not get through.

I have my own functions in PySpark that do this, but I wanted to know if I
could leverage MLLib's PySpark API for the same?


For e.g. :
# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label,
model.predict(p.features)))

where model is a trained classifier.

PS: Sci-kit learn has this feature which is helpful in machine learning
tasks:
model.predict_proba()

Thanks,

*Vedant Dhandhania*


ssh connection refused

2014-07-28 Thread sparking
I'm trying to launch Spark with this command on AWS:
*./spark-ec2 -k keypair_name -i keypair.pem -s 5 -t c1.xlarge -r us-west-2
--hadoop-major-version=2.4.0 launch spark_cluster*

This script is erroring out with this message:
*ssh: connect to host  port 22: Connection refused
Error executing remote command, retrying after 30 seconds*: Command '['ssh',
'-o', 'StrictHostKeyChecking=no', '-i', 'keypair.pem', '-t', '-t',
u'root@', "\n  [ -f ~/.ssh/id_rsa ] ||\n(ssh-keygen -q
-t rsa -N '' -f ~/.ssh/id_rsa &&\n cat ~/.ssh/id_rsa.pub >>
~/.ssh/authorized_keys)\n"]' returned non-zero exit status 255

Strange this is, I can manually ssh to master node as "root" using this
command:
*ssh root@ -i keypair.pem*

Does anyone know what is going on here? Any help is appreciated.






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/ssh-connection-refused-tp10818.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark streaming vs. spark usage

2014-07-28 Thread Ankur Dave
On Mon, Jul 28, 2014 at 12:53 PM, Nathan Kronenfeld <
nkronenf...@oculusinfo.com> wrote:

> But when done processing, one would still have to pull out the wrapped
> object, knowing what it was, and I don't see how to do that.


It's pretty tricky to get the level of type safety you're looking for. I
know of two ways:

1. Leave RDD and DStream as they are, but define a typeclass

that
allows converting them to a common DistributedCollection type. Example

.

2. Make RDD and DStream inherit from a common DistributedCollection trait,
as in your example, but use F-bounded polymorphism
 to
express the concrete types. Example

.

Ankur 


Re: Getting the number of slaves

2014-07-28 Thread Zongheng Yang
Nicholas,

The (somewhat common) situation you ran into probably meant the
executors were still connecting. A typical solution is to sleep a
couple seconds before querying that field.

On Mon, Jul 28, 2014 at 3:57 PM, Andrew Or  wrote:
> Yes, both of these are derived from the same source, and this source
> includes the driver. In other words, if you submit a job with 10 executors
> you will get back 11 for both statuses.
>
>
> 2014-07-28 15:40 GMT-07:00 Sung Hwan Chung :
>
>> Do getExecutorStorageStatus and getExecutorMemoryStatus both return the
>> number of executors + the driver?
>> E.g., if I submit a job with 10 executors, I get 11 for
>> getExeuctorStorageStatus.length and getExecutorMemoryStatus.size
>>
>>
>> On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai 
>> wrote:
>>>
>>> Thanks, this is what I needed :) I should have searched more...
>>>
>>> Something I noticed though: after the SparkContext is initialized, I had
>>> to
>>> wait for a few seconds until sc.getExecutorStorageStatus.length returns
>>> the
>>> correct number of workers in my cluster (otherwise it returns 1, for the
>>> driver)...
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html
>>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>>
>


Re: Getting the number of slaves

2014-07-28 Thread Andrew Or
Yes, both of these are derived from the same source, and this source
includes the driver. In other words, if you submit a job with 10 executors
you will get back 11 for both statuses.


2014-07-28 15:40 GMT-07:00 Sung Hwan Chung :

> Do getExecutorStorageStatus and getExecutorMemoryStatus both return the
> number of executors + the driver?
> E.g., if I submit a job with 10 executors, I get 11 for
> getExeuctorStorageStatus.length and getExecutorMemoryStatus.size
>
>
> On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai 
> wrote:
>
>> Thanks, this is what I needed :) I should have searched more...
>>
>> Something I noticed though: after the SparkContext is initialized, I had
>> to
>> wait for a few seconds until sc.getExecutorStorageStatus.length returns
>> the
>> correct number of workers in my cluster (otherwise it returns 1, for the
>> driver)...
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>
>


Re: Getting the number of slaves

2014-07-28 Thread Sung Hwan Chung
Do getExecutorStorageStatus and getExecutorMemoryStatus both return the
number of executors + the driver?
E.g., if I submit a job with 10 executors, I get 11 for
getExeuctorStorageStatus.length and getExecutorMemoryStatus.size


On Thu, Jul 24, 2014 at 4:53 PM, Nicolas Mai  wrote:

> Thanks, this is what I needed :) I should have searched more...
>
> Something I noticed though: after the SparkContext is initialized, I had to
> wait for a few seconds until sc.getExecutorStorageStatus.length returns the
> correct number of workers in my cluster (otherwise it returns 1, for the
> driver)...
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Getting-the-number-of-slaves-tp10604p10619.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Re: akka.tcp://spark@localhost:7077/user/MapOutputTracker akka.actor.ActorNotFound

2014-07-28 Thread Andrew Milkowski
Dear community never mind! although I was using 1.0.0 spark everywhere I did
not update my spark client

changed pom (from 0.9.0) to 1.0.0

 1.0.0-cdh5.1.0
 1.0.0-cdh5.1.0

fixed the problem




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/akka-tcp-spark-localhost-7077-user-MapOutputTracker-akka-actor-ActorNotFound-tp10794p10813.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: sbt directory missed

2014-07-28 Thread redocpot
Thank you for your reply.

I need sbt for packaging my project and then submit it.

Could you tell me how to run a spark project on 1.0 AMI without sbt?

I don't understand why 1.0 only contains the prebuilt packages. I dont think
it makes sense, since sbt is essential.

User has to download sbt or clone github repo, whereas in 0.9 ami, sbt is
pre-installed.

A command like: 
$ sbt/sbt package run
could do the job.

Thanks. =)



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783p10812.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


RE: Need help, got java.lang.ExceptionInInitializerError in Yarn-Client/Cluster mode

2014-07-28 Thread Andrew Lee
Hi Jianshi,
My understanding is 'No' based on how Spark's is designed even with your own 
log4j.properties in the Spark's conf folder.
In YARN mode, the Application Master is running inside the cluster and all logs 
are part of containers log which is defined by another log4j.properties file 
from the Hadoop and YARN environment. Spark can't override that unless it can 
provide its own log4j prior to YARN's in the classpath. So the only way is to 
login to the resource manager and click on the job itself to read the 
containers log. (Other people) Please correct me if my understanding is wrong.
You may be thinking why can't I stream the log's to an external service (e.g. 
Flume, syslogd) with a different appender in log4j, myself don't consider this 
a good practice since:1. you need 2 infra structure to operate the entire 
cluster.  2. you will need to open up the firewall ports between the 2 services 
to transfer/stream logs.3. unpredictable traffic, the YARN cluster may bring 
down the logging service/infra (DDoS) when someone accidentally change the 
logging level from WARN to INFO, or worst, DEBUG.
I was thinking maybe we can suggest the community to enhance the Spark 
HistoryServer to capture the last failure exception from the container logs in 
the last failed stage? Not sure if this is an good idea since it may complicate 
the event model. I'm not sure if Akka model can support this or some other 
components in Spark could help to capture these exceptions and pass it back to 
AM and eventually stored in somewhere for later troubleshooting. I'm not clear 
how this path is constructed until reading the source code, so I can't give a 
better answer.
AL

From: jianshi.hu...@gmail.com
Date: Mon, 28 Jul 2014 13:32:05 +0800
Subject: Re: Need help, got java.lang.ExceptionInInitializerError in 
Yarn-Client/Cluster mode
To: user@spark.apache.org

Hi Andrew,
Thanks for the reply, I figured out the cause of the issue. Some resource files 
were missing in JARs. A class initialization depends on the resource files so 
it got that exception.


I appended the resource files explicitly to --jars option and it worked fine.
The "Caused by..." messages were found in yarn logs actually, I think it might 
be useful if I can seem them from the console which runs spark-submit. Would 
that be possible?


Jianshi


On Sat, Jul 26, 2014 at 7:08 AM, Andrew Lee  wrote:





Hi Jianshi,
Could you provide which HBase version you're using?
By the way, a quick sanity check on whether the Workers can access HBase?


Were you able to manually write one record to HBase with the serialize 
function? Hardcode and test it ?

From: jianshi.hu...@gmail.com


Date: Fri, 25 Jul 2014 15:12:18 +0800
Subject: Re: Need help, got java.lang.ExceptionInInitializerError in 
Yarn-Client/Cluster mode
To: user@spark.apache.org



I nailed it down to a union operation, here's my code snippet:
val properties: RDD[((String, String, String), Externalizer[KeyValue])] = 
vertices.map { ve =>

  val (vertices, dsName) = ve

  val rval = GraphConfig.getRval(datasetConf, Constants.VERTICES, dsName)   
   val (_, rvalAsc, rvalType) = rval
  println(s"Table name: $dsName, Rval: $rval")



  println(vertices.toDebugString)
  vertices.map { v =>val rk = appendHash(boxId(v.id)).getBytes  
  val cf = PROP_BYTES



val cq = boxRval(v.rval, rvalAsc, rvalType).getBytesval value = 
Serializer.serialize(v.properties)
((new String(rk), new String(cf), new String(cq)),



 Externalizer(put(rk, cf, cq, value)))  }
}.reduce(_.union(_)).sortByKey(numPartitions = 32)

Basically I read data from multiple tables (Seq[RDD[(key, value)]]) and they're 
transformed to the a KeyValue to be insert in HBase, so I need to do a 
.reduce(_.union(_)) to combine them into one RDD[(key, value)].




I cannot see what's wrong in my code.
Jianshi


On Fri, Jul 25, 2014 at 12:24 PM, Jianshi Huang  wrote:




I can successfully run my code in local mode using spark-submit (--master 
local[4]), but I got ExceptionInInitializerError errors in Yarn-client mode.




Any hints what is the problem? Is it a closure serialization problem? How can I 
debug it? Your answers would be very helpful. 

14/07/25 11:48:14 WARN scheduler.TaskSetManager: Loss was due to 
java.lang.ExceptionInInitializerErrorjava.lang.ExceptionInInitializerError  
  at 
com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scal




a:40)at 
com.paypal.risk.rds.granada.storage.hbase.HBaseStore$$anonfun$1$$anonfun$apply$1.apply(HBaseStore.scala:36)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)




at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1016)   
 at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)at 
org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:847)




at 
org.apache.spark.SparkContext$$anonfun$runJob$4

RE: Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir

2014-07-28 Thread Andrew Lee
Hi Andrew,
Thanks to re-confirm the problem. I thought it only happens to my own build. :)
by the way, we have multiple users using the spark-shell to explore their 
dataset, and we are continuously looking into ways to isolate their jobs 
history. In the current situation, we can't really ask them to create their own 
spark-defaults.conf since this is set to read-only. A workaround is to set it 
to a shared folder e.g. /user/spark/logs and user permission 1777. This isn't 
really ideal since other people can see what are the other jobs running on the 
shared cluster.
It will be nice to have a better security if this is enhanced so people aren't 
exposing their algorithm (which is usually embed in their job's name) to other 
users.
Will there or is there a JIRA ticket to keep track of this? any plan to enhance 
this part for spark-shell ?


Date: Mon, 28 Jul 2014 13:54:56 -0700
Subject: Re: Issues on spark-shell and spark-submit behave differently on 
spark-defaults.conf parameter spark.eventLog.dir
From: and...@databricks.com
To: user@spark.apache.org

Hi Andrew,
It's definitely not bad practice to use spark-shell with HistoryServer. The 
issue here is not with spark-shell, but the way we pass Spark configs to the 
application. spark-defaults.conf does not currently support embedding 
environment variables, but instead interprets everything as a string literal. 
You will have to manually specify "test" instead of "$USER" in the path you 
provide to spark.eventLog.dir.

-Andrew

2014-07-28 12:40 GMT-07:00 Andrew Lee :




Hi All,
Not sure if anyone has ran into this problem, but this exist in spark 1.0.0 
when you specify the location in conf/spark-defaults.conf for

spark.eventLog.dir hdfs:///user/$USER/spark/logs
to use the $USER env variable. 

For example, I'm running the command with user 'test'.
In spark-submit, the folder will be created on-the-fly and you will see the 
event logs created on HDFS /user/test/spark/logs/spark-pi-1405097484152

but in spark-shell, the user 'test' folder is not created, and you will see 
this /user/$USER/spark/logs on HDFS. It will try to create 
/user/$USER/spark/logs instead of /user/test/spark/logs.

It looks like spark-shell couldn't pick up the env variable $USER to apply for 
the eventLog directory for the running user 'test'.

Is this considered a bug or bad practice to use spark-shell with Spark's 
HistoryServer?









  

  

Re: KMeans: expensiveness of large vectors

2014-07-28 Thread Xiangrui Meng
1. I meant in the n (1k) by m (10k) case, we need to broadcast k
centers and hence the total size is m * k. In 1.0, the driver needs to
send the current centers to each partition one by one. In the current
master, we use torrent to broadcast the centers to workers, which
should be much faster.

2. For MLlib algorithms, the number of partitions shouldn't be much
larger than the number of CPU cores. Your setting looks good.

3. You can use the hashing trick to limit the number of features, or
remove low-frequency and high-frequency words from the dictionary.

Best,
Xiangrui

On Mon, Jul 28, 2014 at 12:55 PM, durin  wrote:
> Hi Xiangru,
>
> thanks for the explanation.
>
> 1. You said we have to broadcast m * k centers (with m = number of rows). I
> thought there were only k centers at each time, which would the have size of
> n * k and needed to be broadcasted. Is that I typo or did I understand
> something wrong?
> And the collection of the average is partition-wise. So more partitions =
> more overhead, but basically same number of operations?
>
> 2. I have 5 executors with 8 CPU cores and 25G of memory each, and I usually
> split the input RDD into 80 partitions for a few Gigs of input data. Is
> there a rule of thumb for the number of partitions in relation to the input
> size?
>
>
> 3. Assuming I wouldn't use numeric data but instead converted text data into
> a numeric representation using a dictionary and a featurization function:
> The number of columns would be the number of entries in my dictionary (i.e.
> number of distinct words in my case). I'd use a sparse vector representation
> of course. But even so, if I have a few hundred thousand entries and
> therefore columns, broadcasting overhead will get very large, as the centers
> are still in a dense representation.
> Do you know of any way to improve performance then?
>
>
> Best regards,
> Simon
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10804.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir

2014-07-28 Thread Andrew Or
Hi Andrew,

It's definitely not bad practice to use spark-shell with HistoryServer. The
issue here is not with spark-shell, but the way we pass Spark configs to
the application. spark-defaults.conf does not currently support embedding
environment variables, but instead interprets everything as a string
literal. You will have to manually specify "test" instead of "$USER" in the
path you provide to spark.eventLog.dir.

-Andrew


2014-07-28 12:40 GMT-07:00 Andrew Lee :

> Hi All,
>
> Not sure if anyone has ran into this problem, but this exist in spark
> 1.0.0 when you specify the location in *conf/spark-defaults.conf* for
>
> spark.eventLog.dir hdfs:///user/$USER/spark/logs
>
> to use the *$USER* env variable.
>
> For example, I'm running the command with user 'test'.
>
> In *spark-submit*, the folder will be created on-the-fly and you will see
> the event logs created on HDFS
> */user/test/spark/logs/spark-pi-1405097484152*
>
> but in *spark-shell*, the user 'test' folder is not created, and you will
> see this */user/$USER/spark/logs* on HDFS. It will try to create
> */user/$USER/spark/logs* instead of */user/test/spark/logs*.
>
> It looks like spark-shell couldn't pick up the env variable $USER to apply
> for the eventLog directory for the running user 'test'.
>
> Is this considered a bug or bad practice to use spark-shell with Spark's
> HistoryServer?
>
>


RE: Spark java.lang.AbstractMethodError

2014-07-28 Thread Adrian Mocanu
I'm interested in this as well!

From: Alex Minnaar [mailto:aminn...@verticalscope.com]
Sent: July-28-14 3:40 PM
To: user@spark.apache.org
Subject: Spark java.lang.AbstractMethodError


I am trying to run an example Spark standalone app with the following code


import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkGensimLDA extends App{

  val ssc=new StreamingContext("local","testApp",Seconds(5))

  val lines=ssc.textFileStream("/.../spark_example/")

  val words=lines.flatMap(_.split(" "))

  val wordCounts=words.map(x => (x,1)).reduceByKey(_ + _)

  wordCounts.print()


  ssc.start()
  ssc.awaitTermination()

}





However I am getting the following error




15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark]
java.lang.AbstractMethodError: null
at akka.actor.ActorCell.create(ActorCell.scala:580) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.AbstractMethodError: null
at akka.actor.ActorCell.create(ActorCell.scala:580) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating local 
directories at root dirs '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/'
15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark]
java.lang.AbstractMethodError: 
org.apache.spark.storage.BlockManagerMasterActor.aroundPostStop()V
at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
 ~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:369) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
15:35:40.177 [spark-akka.actor

zip two RDD in pyspark

2014-07-28 Thread lllll
I have a file in s3 that I want to map each line with an index. Here is my
code:

>>> input_data = sc.textFile('s3n:/myinput',minPartitions=6).cache()
>>> N input_data.count()
>>> index = sc.parallelize(range(N), 6)
>>> index.zip(input_data).collect()

...
14/07/28 19:49:31 INFO DAGScheduler: Completed ResultTask(18, 4)
14/07/28 19:49:31 INFO DAGScheduler: Stage 18 (collect at :1)
finished in 0.031 s
14/07/28 19:49:31 INFO SparkContext: Job finished: collect at :1,
took 0.03707 s
Traceback (most recent call last):
  File "", line 1, in 
  File "/root/spark/python/pyspark/rdd.py", line 584, in collect
return list(self._collect_iterator_through_file(bytesInJava))
  File "/root/spark/python/pyspark/rdd.py", line 592, in
_collect_iterator_through_file
self.ctx._writeToFile(iterator, tempFile.name)
  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/java_gateway.py",
line 537, in __call__
  File "/root/spark/python/lib/py4j-0.8.1-src.zip/py4j/protocol.py", line
300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
z:org.apache.spark.api.python.PythonRDD.writeToFile.
: java.lang.ClassCastException: java.lang.String cannot be cast to [B
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:312)
at
org.apache.spark.api.python.PythonRDD$$anonfun$writeIteratorToStream$3.apply(PythonRDD.scala:309)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:309)
at 
org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:342)
at 
org.apache.spark.api.python.PythonRDD$.writeToFile(PythonRDD.scala:337)
at org.apache.spark.api.python.PythonRDD.writeToFile(PythonRDD.scala)
at sun.reflect.GeneratedMethodAccessor24.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:744)

As I see it, the job is completed, but I don't understand what's happening
to 'String cannot be cast to [B'. I tried to zip two parallelCollectionRDD
and it works fine. But here I have a MappedRDD at textFile. Not sure what's
going on here. 

Also, why Python does not have ZipWithIndex()?

Thanks for any help. 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/zip-two-RDD-in-pyspark-tp10806.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: KMeans: expensiveness of large vectors

2014-07-28 Thread durin
Hi Xiangru,

thanks for the explanation.

1. You said we have to broadcast m * k centers (with m = number of rows). I
thought there were only k centers at each time, which would the have size of
n * k and needed to be broadcasted. Is that I typo or did I understand
something wrong?
And the collection of the average is partition-wise. So more partitions =
more overhead, but basically same number of operations?

2. I have 5 executors with 8 CPU cores and 25G of memory each, and I usually
split the input RDD into 80 partitions for a few Gigs of input data. Is
there a rule of thumb for the number of partitions in relation to the input
size?


3. Assuming I wouldn't use numeric data but instead converted text data into
a numeric representation using a dictionary and a featurization function:
The number of columns would be the number of entries in my dictionary (i.e.
number of distinct words in my case). I'd use a sparse vector representation
of course. But even so, if I have a few hundred thousand entries and
therefore columns, broadcasting overhead will get very large, as the centers
are still in a dense representation.
Do you know of any way to improve performance then?


Best regards,
Simon



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/KMeans-expensiveness-of-large-vectors-tp10614p10804.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Spark streaming vs. spark usage

2014-07-28 Thread Nathan Kronenfeld
So after months and months, I finally started to try and tackle this, but
my scala ability isn't up to it.

The problem is that, of course, even with the common interface, we don't
want inter-operability between RDDs and DStreams.

I looked into Monads, as per Ashish's suggestion, and I think I understand
their relevance.  But when done processing, one would still have to pull
out the wrapped object, knowing what it was, and I don't see how to do that.

I'm guessing there is a way to do this in scala, but I'm not seeing it.

In detail, the requirement would be having something on the order of:

abstract class DistributedCollection[T] {
def [U] map(fcn: T => U): DistributedCollection[U]
...
}

class RDD extends DistrubutedCollection[T] {
// Note the return type that doesn't quite match the interface
def [U] map(fcn: T => U): RDD[U]
...
}

class DStream extends DistrubutedCollection[T] {
// Note the return type that doesn't quite match the interface
def [U] map(fcn: T => U): DStreamU]
...
}

Can anyone point me at a way to do this?

Thanks,
 -Nathan



On Thu, Dec 19, 2013 at 1:08 AM, Ashish Rangole  wrote:

> I wonder if it will help to have a generic Monad container that wraps
> either RDD or DStream and provides
> map, flatmap, foreach and filter methods.
>
> case class DataMonad[A](data: A) {
> def map[B]( f : A => B ) : DataMonad[B] = {
>DataMonad( f( data ) )
> }
>
> def flatMap[B]( f : A => DataMonad[B] ) : DataMonad[B] = {
>f( data )
> }
>
> def foreach ...
> def withFilter ...
> :
> :
> etc, something like that
> }
>
> On Wed, Dec 18, 2013 at 10:42 PM, Reynold Xin  wrote:
>
>>
>> On Wed, Dec 18, 2013 at 12:17 PM, Nathan Kronenfeld <
>> nkronenf...@oculusinfo.com> wrote:
>>
>>>
>>>
>>> Since many of the functions exist in parallel between the two, I guess I
>>> would expect something like:
>>>
>>> trait BasicRDDFunctions {
>>> def map...
>>> def reduce...
>>> def filter...
>>> def foreach...
>>> }
>>>
>>> class RDD extends  BasicRDDFunctions...
>>> class DStream extends BasicRDDFunctions...
>>>
>>
>> I like this idea. We should discuss more about it on the dev list. It
>> would require refactoring some APIs, but does lead to better unification.
>>
>
>


-- 
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenf...@oculusinfo.com


Re: sbt directory missed

2014-07-28 Thread Shivaram Venkataraman
I think the 1.0 AMI only contains the prebuilt packages (i.e just the
binaries) of Spark and not the source code. If you want to build Spark on
EC2, you'll can clone the github repo and then use sbt.

Thanks
Shivaram


On Mon, Jul 28, 2014 at 8:49 AM, redocpot  wrote:

> update:
>
> Just checked the python launch script, when retrieving spark, it will refer
> to this script:
> https://github.com/mesos/spark-ec2/blob/v3/spark/init.sh
>
> where each version number is mapped to a tar file,
>
> 0.9.2)
>   if [[ "$HADOOP_MAJOR_VERSION" == "1" ]]; then
> wget
> http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-hadoop1.tgz
>   else
> wget
> http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-cdh4.tgz
>   fi
>   ;;
> 1.0.0)
>   if [[ "$HADOOP_MAJOR_VERSION" == "1" ]]; then
> wget
> http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-hadoop1.tgz
>   else
> wget
> http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-cdh4.tgz
>   fi
>   ;;
> 1.0.1)
>   if [[ "$HADOOP_MAJOR_VERSION" == "1" ]]; then
> wget
> http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-hadoop1.tgz
>   else
> wget
> http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-cdh4.tgz
>   fi
>   ;;
>
> I just checked the three last tar file. I find the /sbt directory and many
> other directory like bagel, mllib, etc in 0.9.2 tar file. However, they are
> not in 1.0.0 and 1.0.1 tar files.
>
> I am not sure that 1.0.X versions are mapped to the correct tar files.
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783p10784.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>


Issues on spark-shell and spark-submit behave differently on spark-defaults.conf parameter spark.eventLog.dir

2014-07-28 Thread Andrew Lee
Hi All,
Not sure if anyone has ran into this problem, but this exist in spark 1.0.0 
when you specify the location in conf/spark-defaults.conf for
spark.eventLog.dir hdfs:///user/$USER/spark/logs
to use the $USER env variable. 
For example, I'm running the command with user 'test'.
In spark-submit, the folder will be created on-the-fly and you will see the 
event logs created on HDFS /user/test/spark/logs/spark-pi-1405097484152
but in spark-shell, the user 'test' folder is not created, and you will see 
this /user/$USER/spark/logs on HDFS. It will try to create 
/user/$USER/spark/logs instead of /user/test/spark/logs.
It looks like spark-shell couldn't pick up the env variable $USER to apply for 
the eventLog directory for the running user 'test'.
Is this considered a bug or bad practice to use spark-shell with Spark's 
HistoryServer?








  

Re: how to publish spark inhouse?

2014-07-28 Thread Koert Kuipers
ah ok thanks. guess i am gonna read up about maven-release-plugin then!


On Mon, Jul 28, 2014 at 3:37 PM, Sean Owen  wrote:

> This is not something you edit yourself. The Maven release plugin
> manages setting all this. I think virtually everything you're worried
> about is done for you by this plugin.
>
> Maven requires artifacts to set a version and it can't inherit one. I
> feel like I understood the reason this is necessary at one point.
>
> On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers  wrote:
> > and if i want to change the version, it seems i have to change it in all
> 23
> > pom files? mhhh. is it mandatory for these sub-project pom files to
> repeat
> > that version info? useful?
> >
> > spark$ grep 1.1.0-SNAPSHOT * -r  | wc -l
> > 23
> >
> >
> >
> > On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers 
> wrote:
> >>
> >> hey we used to publish spark inhouse by simply overriding the publishTo
> >> setting. but now that we are integrated in SBT with maven i cannot find
> it
> >> anymore.
> >>
> >> i tried looking into the pom file, but after reading 1144 lines of xml i
> >> 1) havent found anything that looks like publishing
> >> 2) i feel somewhat sick too
> >> 3) i am considering alternative careers to developing...
> >>
> >> where am i supposed to look?
> >> thanks for your help!
> >
> >
>


Spark java.lang.AbstractMethodError

2014-07-28 Thread Alex Minnaar
I am trying to run an example Spark standalone app with the following code


import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object SparkGensimLDA extends App{

  val ssc=new StreamingContext("local","testApp",Seconds(5))

  val lines=ssc.textFileStream("/.../spark_example/")

  val words=lines.flatMap(_.split(" "))

  val wordCounts=words.map(x => (x,1)).reduceByKey(_ + _)

  wordCounts.print()


  ssc.start()
  ssc.awaitTermination()

}



However I am getting the following error



15:35:40.170 [spark-akka.actor.default-dispatcher-2] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-3] shutting down ActorSystem [spark]
java.lang.AbstractMethodError: null
at akka.actor.ActorCell.create(ActorCell.scala:580) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
15:35:40.171 [spark-akka.actor.default-dispatcher-2] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.AbstractMethodError: null
at akka.actor.ActorCell.create(ActorCell.scala:580) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:456) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
15:35:40.175 [main] DEBUG o.a.spark.storage.DiskBlockManager - Creating local 
directories at root dirs '/var/folders/6y/h1f088_j007_d11kpwb1jg6mgp/T/'
15:35:40.176 [spark-akka.actor.default-dispatcher-4] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-2] shutting down ActorSystem [spark]
java.lang.AbstractMethodError: 
org.apache.spark.storage.BlockManagerMasterActor.aroundPostStop()V
at 
akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
 ~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.terminate(ActorCell.scala:369) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:462) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.actor.ActorCell.systemInvoke(ActorCell.scala:478) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:263) 
~[akka-actor_2.10-2.3.2.jar:na]
at akka.dispatch.Mailbox.run(Mailbox.scala:219) ~[akka-actor_2.10-2.3.2.jar:na]
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 [akka-actor_2.10-2.3.2.jar:na]
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 [scala-library-2.10.4.jar:na]
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[scala-library-2.10.4.jar:na]
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 [scala-library-2.10.4.jar:na]
15:35:40.177 [spark-akka.actor.default-dispatcher-4] ERROR 
akka.actor.ActorSystemImpl - Uncaught fatal error from thread 
[spark-akka.actor.default-dispatcher-4] shutting down ActorSystem [spark]
java.lang.AbstractMeth

Re: how to publish spark inhouse?

2014-07-28 Thread Sean Owen
This is not something you edit yourself. The Maven release plugin
manages setting all this. I think virtually everything you're worried
about is done for you by this plugin.

Maven requires artifacts to set a version and it can't inherit one. I
feel like I understood the reason this is necessary at one point.

On Mon, Jul 28, 2014 at 8:33 PM, Koert Kuipers  wrote:
> and if i want to change the version, it seems i have to change it in all 23
> pom files? mhhh. is it mandatory for these sub-project pom files to repeat
> that version info? useful?
>
> spark$ grep 1.1.0-SNAPSHOT * -r  | wc -l
> 23
>
>
>
> On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers  wrote:
>>
>> hey we used to publish spark inhouse by simply overriding the publishTo
>> setting. but now that we are integrated in SBT with maven i cannot find it
>> anymore.
>>
>> i tried looking into the pom file, but after reading 1144 lines of xml i
>> 1) havent found anything that looks like publishing
>> 2) i feel somewhat sick too
>> 3) i am considering alternative careers to developing...
>>
>> where am i supposed to look?
>> thanks for your help!
>
>


Re: how to publish spark inhouse?

2014-07-28 Thread Koert Kuipers
and if i want to change the version, it seems i have to change it in all 23
pom files? mhhh. is it mandatory for these sub-project pom files to repeat
that version info? useful?

spark$ grep 1.1.0-SNAPSHOT * -r  | wc -l
23



On Mon, Jul 28, 2014 at 3:05 PM, Koert Kuipers  wrote:

> hey we used to publish spark inhouse by simply overriding the publishTo
> setting. but now that we are integrated in SBT with maven i cannot find it
> anymore.
>
> i tried looking into the pom file, but after reading 1144 lines of xml i
> 1) havent found anything that looks like publishing
> 2) i feel somewhat sick too
> 3) i am considering alternative careers to developing...
>
> where am i supposed to look?
> thanks for your help!
>


javasparksql Hbase

2014-07-28 Thread Madabhattula Rajesh Kumar
Hi Team,

Could you please let me know example program/link for JavaSparkSql to join
2 Hbase tables.

Regards,
Rajesh


how to publish spark inhouse?

2014-07-28 Thread Koert Kuipers
hey we used to publish spark inhouse by simply overriding the publishTo
setting. but now that we are integrated in SBT with maven i cannot find it
anymore.

i tried looking into the pom file, but after reading 1144 lines of xml i
1) havent found anything that looks like publishing
2) i feel somewhat sick too
3) i am considering alternative careers to developing...

where am i supposed to look?
thanks for your help!


Re: VertexPartition and ShippableVertexPartition

2014-07-28 Thread Ankur Dave
On Mon, Jul 28, 2014 at 4:29 AM, Larry Xiao  wrote:

> On 7/28/14, 3:41 PM, shijiaxin wrote:
>
>> There is a VertexPartition in the EdgePartition,which is created by
>>
>> EdgePartitionBuilder.toEdgePartition.
>>
>> and There is also a ShippableVertexPartition in the VertexRDD.
>>
>> These two Partitions have a lot of common things like index, data and
>>
>> Bitset, why is this necessary?
>>
>>

There is a VertexPartition in the EdgePartition,which is created by
>
Is the VertexPartition in the EdgePartition, the Mirror Cache part?


Yes, exactly. The primary copy of each vertex is stored in the VertexRDD
using the index, values, and mask data structures, which together form a
hash map. In addition, each partition of the VertexRDD stores the
corresponding partition of the routing table to facilitate joining with the
edges. The ShippableVertexPartition class encapsulates the vertex hash map
along with a RoutingTablePartition.

After joining the vertices with the edges, the edge partitions cache their
adjacent vertices in the mirror cache. They use the VertexPartition for
this, which provides only the hash map functionality and not the routing
table.

Ankur 


akka.tcp://spark@localhost:7077/user/MapOutputTracker akka.actor.ActorNotFound

2014-07-28 Thread Andrew Milkowski
Hello community

Using following distros:

spark:
http://archive.cloudera.com/cdh5/cdh/5/spark-1.0.0-cdh5.1.0-src.tar.gz
mesos: http://archive.apache.org/dist/mesos/0.19.0/mesos-0.19.0.tar.gz

both assembled with with scala 2.10.4 and java 7

my

#!/usr/bin/env bash

my spark-env.sh looks as follows:

export SCALA_HOME=/opt/local/src/scala/scala-2.10.4
export
MESOS_NATIVE_LIBRARY=/opt/local/src/mesos/mesos-0.19.0/dist/lib/libmesos.so
export
SPARK_EXECUTOR_URI=hdfs://localhost:8020/spark/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz
export
HADOOP_CONF_DIR=/opt/local/cloudera/hadoop/cdh5/hadoop-2.3.0-cdh5.0.3/etc/hadoop
export STANDALONE_SPARK_MASTER_HOST=192.168.122.1

export MASTER=mesos://192.168.122.1
export SPARK_MASTER_IP=192.168.122.1
export SPARK_LOCAL_IP=192.168.122.1

When I run a sample spark job I get (below)

thanks in advance for explanation/fix to the exception

Note if I run spark job on spark by itself (or hadoop yarn) job runs
without any problem


WARNING: Logging before InitGoogleLogging() is written to STDERR
I0728 14:33:52.421203 19678 fetcher.cpp:73] Fetching URI
'hdfs://localhost:8020/spark/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz'
I0728 14:33:52.421346 19678 fetcher.cpp:102] Downloading resource from
'hdfs://localhost:8020/spark/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz'
to
'/tmp/mesos/slaves/20140724-134606-16777343-5050-25095-0/frameworks/20140728-143300-24815808-5050-19059-/executors/20140724-134606-16777343-5050-25095-0/runs/c9c9eaa2-b722-4215-a35a-dc1c353963b9/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz'
I0728 14:33:58.201438 19678 fetcher.cpp:61] Extracted resource
'/tmp/mesos/slaves/20140724-134606-16777343-5050-25095-0/frameworks/20140728-143300-24815808-5050-19059-/executors/20140724-134606-16777343-5050-25095-0/runs/c9c9eaa2-b722-4215-a35a-dc1c353963b9/spark-1.0.0-cdh5.1.0-bin-2.3.0-cdh5.0.3.tgz'
into
'/tmp/mesos/slaves/20140724-134606-16777343-5050-25095-0/frameworks/20140728-143300-24815808-5050-19059-/executors/20140724-134606-16777343-5050-25095-0/runs/c9c9eaa2-b722-4215-a35a-dc1c353963b9'
Spark assembly has been built with Hive, including Datanucleus jars on
classpath
log4j:WARN No appenders could be found for logger
(org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
more info.
14/07/28 14:33:59 INFO SparkHadoopUtil: Using Spark's default log4j
profile: org/apache/spark/log4j-defaults.properties
WARNING: Logging before InitGoogleLogging() is written to STDERR
I0728 14:33:59.896520 19785 exec.cpp:131] Version: 0.19.0
I0728 14:33:59.899474 19805 exec.cpp:205] Executor registered on slave
20140724-134606-16777343-5050-25095-0
14/07/28 14:33:59 INFO MesosExecutorBackend: Registered with Mesos as
executor ID 20140724-134606-16777343-5050-25095-0
14/07/28 14:34:00 INFO SecurityManager: Changing view acls to: amilkowski
14/07/28 14:34:00 INFO SecurityManager: SecurityManager: authentication
disabled; ui acls disabled; users with view permissions: Set(amilkowski)
14/07/28 14:34:00 INFO Slf4jLogger: Slf4jLogger started
14/07/28 14:34:00 INFO Remoting: Starting remoting
14/07/28 14:34:01 INFO Remoting: Remoting started; listening on addresses
:[akka.tcp://spark@localhost:40412]
14/07/28 14:34:01 INFO Remoting: Remoting now listens on addresses:
[akka.tcp://spark@localhost:40412]
14/07/28 14:34:01 INFO SparkEnv: Connecting to MapOutputTracker:
akka.tcp://spark@localhost:7077/user/MapOutputTracker
akka.actor.ActorNotFound: Actor not found for:
ActorSelection[Actor[akka.tcp://spark@localhost
:7077/]/user/MapOutputTracker]
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:66)
at
akka.actor.ActorSelection$$anonfun$resolveOne$1.apply(ActorSelection.scala:64)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:67)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:82)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at
akka.dispatch.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:59)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$Batch.run(BatchingExecutor.scala:58)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.unbatchedExecute(Future.scala:74)
at akka.dispatch.BatchingExecutor$class.execute(BatchingExecutor.scala:110)
at
akka.dispatch.ExecutionContexts$sameThreadExecutionContext$.execute(Future.scala:73)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
at
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:269)
at akka.actor.EmptyLocalActorRef.specialHandle(ActorRef.sc

Re: MLlib NNLS implementation is buggy, returning wrong solutions

2014-07-28 Thread Shuo Xiang
It is possible that the answer (the final solution vector x) given by two
different algorithms (such as the one in mllib and in R) are different, as
the problem may not be strictly convex and multiple global optimum may
exist. However, these answers should admit the same objective values. Can
you give an example such that the objective value of other method is better
(smaller) than the obj of mllib?


2014-07-27 11:06 GMT-07:00 Aureliano Buendia :

> Hi,
>
> The recently added NNLS implementation in MLlib returns wrong solutions.
> This is not data specific, just try any data in R's nnls, and then the same
> data in MLlib's NNLS. The results are very different.
>
> Also, the elected algorithm Polyak(1969) is not the best one around. The
> most popular one is Lawson-Hanson (1974):
>
> http://en.wikipedia.org/wiki/Non-negative_least_squares#Algorithms
>
>
>


Re: MLlib NNLS implementation is buggy, returning wrong solutions

2014-07-28 Thread Debasish Das
Hi Aureliano,

Will it be possible for you to give the test-case ? You can add it to JIRA
as well as an attachment I guess...

I am preparing the PR for ADMM based QuadraticMinimizer...In my matlab
experiments with scaling the rank to 1000 and beyond (which is too high for
ALS but gives a good idea of solver scalability, ~400 is the max I have
seen in the sparkler paper), I am noticing consistent results both in
correctness and runtime with MOSEK...

I will update more on the JIRA this week...got it cleared from our legal
last week...Please stay tuned...

https://issues.apache.org/jira/browse/SPARK-2426

Thanks.
Deb


On Sun, Jul 27, 2014 at 12:38 PM, DB Tsai  wrote:

> Could you help to provide a test case to verify this issue and open a JIRA
> to track this? Also, are you interested in submit a PR to fix it? Thanks.
>
> Sent from my Google Nexus 5
> On Jul 27, 2014 11:07 AM, "Aureliano Buendia" 
> wrote:
>
>> Hi,
>>
>> The recently added NNLS implementation in MLlib returns wrong solutions.
>> This is not data specific, just try any data in R's nnls, and then the same
>> data in MLlib's NNLS. The results are very different.
>>
>> Also, the elected algorithm Polyak(1969) is not the best one around. The
>> most popular one is Lawson-Hanson (1974):
>>
>> http://en.wikipedia.org/wiki/Non-negative_least_squares#Algorithms
>>
>>
>>


Re: [Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-28 Thread Zongheng Yang
The optimal config depends on lots of things, but did you try a
smaller numPartition size? Just guessing -- 160 / 320 may be
reasonable.

On Mon, Jul 28, 2014 at 1:52 AM, Earthson  wrote:
> I'm using SparkSQL with Hive 0.13, here is the SQL for inserting a partition
> with 2048 buckets.
> 
>   sqlsc.set("spark.sql.shuffle.partitions", "2048")
>   hql("""|insert %s table mz_log
>|PARTITION (date='%s')
>|select * from tmp_mzlog
>|CLUSTER BY mzid
> """.stripMargin.format(overwrite, log_date))
> 
>
> env:
>
> yarn-client mode with 80 executor, 2 cores/per executor.
>
> Data:
>
> original text log is about 1.1T.
>
> - - -
>
> the reduce stage is too slow.
>
> 
>
> here is the network usage, it's not the bottle neck.
>
> 
>
> and the CPU load is very high, why?
>
> 
> here is the configuration(conf/spark-defaults.conf)
>
> 
> spark.ui.port   
> spark.akka.frameSize128
> spark.akka.timeout  600
> spark.akka.threads  8
> spark.files.overwrite   true
> spark.executor.memory   2G
> spark.default.parallelism   32
> spark.shuffle.consolidateFiles  true
> spark.kryoserializer.buffer.mb  128
> spark.storage.blockManagerSlaveTimeoutMs20
> spark.serializerorg.apache.spark.serializer.KryoSerializer
> 
>
> 2 failed with MapTracker Error.
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Debugging "Task not serializable"

2014-07-28 Thread andy petrella
Also check the guides for the JVM option that prints messages for such
problems.
Sorry, sent from phone and don't know it by heart :/
Le 28 juil. 2014 18:44, "Akhil Das"  a écrit :

> A quick fix would be to implement java.io.Serializable in those classes
> which are causing this exception.
>
>
>
> Thanks
> Best Regards
>
>
> On Mon, Jul 28, 2014 at 9:21 PM, Juan Rodríguez Hortalá <
> juan.rodriguez.hort...@gmail.com> wrote:
>
>> Hi all,
>>
>> I was wondering if someone has conceived a method for debugging "Task not
>> serializable: java.io.NotSerializableException" errors, apart from
>> commenting and uncommenting parts of the program, or just turning
>> everything into Serializable. I find this kind of error very hard to debug,
>> as these are originated in the Spark runtime system.
>>
>> I'm using Spark for Java.
>>
>> Thanks a lot in advance,
>>
>> Juan
>>
>
>


Re: Debugging "Task not serializable"

2014-07-28 Thread Akhil Das
A quick fix would be to implement java.io.Serializable in those classes
which are causing this exception.



Thanks
Best Regards


On Mon, Jul 28, 2014 at 9:21 PM, Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi all,
>
> I was wondering if someone has conceived a method for debugging "Task not
> serializable: java.io.NotSerializableException" errors, apart from
> commenting and uncommenting parts of the program, or just turning
> everything into Serializable. I find this kind of error very hard to debug,
> as these are originated in the Spark runtime system.
>
> I'm using Spark for Java.
>
> Thanks a lot in advance,
>
> Juan
>


Re: Fraud management system implementation

2014-07-28 Thread Sandy Ryza
+user list
bcc: dev list

It's definitely possible to implement credit fraud management using Spark.
 A good start would be using some of the supervised learning algorithms
that Spark provides in MLLib (logistic regression or linear SVMs).

Spark doesn't have any HMM implementation right now.  Sean Owen has a great
talk on performing anomaly detection with KMeans clustering in Spark -
https://www.youtube.com/watch?v=TC5cKYBZAeI

-Sandy


On Mon, Jul 28, 2014 at 7:15 AM, jitendra shelar <
jitendra.shelar...@gmail.com> wrote:

> Hi,
>
> I am new to spark. I am learning spark and scala.
>
> I had some queries.
>
> 1) Can somebody please tell me if it is possible to implement credit
> card fraud management system using spark?
> 2) If yes, can somebody please guide me how to proceed.
> 3) Shall I prefer Scala or Java for this implementation?
>
> 4) Please suggest me some pointers related to Hidden Markonav Model
> (HMM) and anomaly detection in data mining (using spark).
>
> Thanks,
> Jitendra
>


Debugging "Task not serializable"

2014-07-28 Thread Juan Rodríguez Hortalá
Hi all,

I was wondering if someone has conceived a method for debugging "Task not
serializable: java.io.NotSerializableException" errors, apart from
commenting and uncommenting parts of the program, or just turning
everything into Serializable. I find this kind of error very hard to debug,
as these are originated in the Spark runtime system.

I'm using Spark for Java.

Thanks a lot in advance,

Juan


Re: sbt directory missed

2014-07-28 Thread redocpot
update:

Just checked the python launch script, when retrieving spark, it will refer
to this script:
https://github.com/mesos/spark-ec2/blob/v3/spark/init.sh

where each version number is mapped to a tar file,

0.9.2)
  if [[ "$HADOOP_MAJOR_VERSION" == "1" ]]; then
wget
http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-hadoop1.tgz
  else
wget
http://s3.amazonaws.com/spark-related-packages/spark-0.9.2-bin-cdh4.tgz
  fi
  ;;
1.0.0)
  if [[ "$HADOOP_MAJOR_VERSION" == "1" ]]; then
wget
http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-hadoop1.tgz
  else
wget
http://s3.amazonaws.com/spark-related-packages/spark-1.0.0-bin-cdh4.tgz
  fi
  ;;
1.0.1)
  if [[ "$HADOOP_MAJOR_VERSION" == "1" ]]; then
wget
http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-hadoop1.tgz
  else
wget
http://s3.amazonaws.com/spark-related-packages/spark-1.0.1-bin-cdh4.tgz
  fi
  ;;

I just checked the three last tar file. I find the /sbt directory and many
other directory like bagel, mllib, etc in 0.9.2 tar file. However, they are
not in 1.0.0 and 1.0.1 tar files.

I am not sure that 1.0.X versions are mapped to the correct tar files.




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783p10784.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


sbt directory missed

2014-07-28 Thread redocpot
Hi, 

I have started a EC2 cluster using Spark by running spark-ec2 script.

Just a little confused, I can not find sbt/ directory under /spark.

I have checked spark-version, it's 1.0.0 (default). When I was working
0.9.x, sbt/ has been there.

Is the script changed in 1.0.X ? I can not find any change log on this. Or
maybe I am missing something.

Certainly, I can download sbt and make things work. Just want to make things
clear.

Thank you.

Here is the file list of spark/

root@ip-10-81-154-223:~# ls -l spark
total 384
drwxrwxr-x 10 1000 1000   4096 Jul 28 14:58 .
drwxr-xr-x 20 root root   4096 Jul 28 14:58 ..
drwxrwxr-x  2 1000 1000   4096 Jul 28 13:34 bin
-rw-rw-r--  1 1000 1000 281471 May 26 07:02 CHANGES.txt
drwxrwxr-x  2 1000 1000   4096 Jul 28 08:22 conf
drwxrwxr-x  4 1000 1000   4096 May 26 07:02 ec2
drwxrwxr-x  3 1000 1000   4096 May 26 07:02 examples
drwxrwxr-x  2 1000 1000   4096 May 26 07:02 lib
-rw-rw-r--  1 1000 1000  29983 May 26 07:02 LICENSE
drwxr-xr-x  2 root root   4096 Jul 28 14:42 logs
-rw-rw-r--  1 1000 1000  22559 May 26 07:02 NOTICE
drwxrwxr-x  6 1000 1000   4096 May 26 07:02 python
-rw-rw-r--  1 1000 1000   4221 May 26 07:02 README.md
-rw-rw-r--  1 1000 1000 35 May 26 07:02 RELEASE
drwxrwxr-x  2 1000 1000   4096 May 26 07:02 sbin









--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sbt-directory-missed-tp10783.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Bad Digest error while doing aws s3 put

2014-07-28 Thread lmk
Hi
I was using saveAsTextFile earlier. It was working fine. When we migrated to
spark-1.0, I started getting the following error:
java.lang.ClassNotFoundException:
org.apache.spark.rdd.RDD$$anonfun$saveAsTextFile$1
java.net.URLClassLoader$1.run(URLClassLoader.java:366)
java.net.URLClassLoader$1.run(URLClassLoader.java:355)

Hence I changed my code as follows:

x.map(x => (NullWritable.get(), new
Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable,
Text]](path)

After this I am facing this problem when I write very huge data to s3. This
also occurs while writing to some partitions only, say while writing to 240
partitions, it might succeed for 156 files and then it will start throwing
the Bad Digest Error and then it hangs.

Please advise.

Regards,
lmk



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Bad-Digest-error-while-doing-aws-s3-put-tp10036p10780.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread chang cheng
Exactly, the fields between "!!" is a (key, value) customized data structure. 

So, newAPIHadoopFile may be the best practice now. For this specific format,
change the delimiter from default "\n" to "!!\n" can be the cheapest, and
this can only be done in hadoop2.x, in hadoop1.x, this can be done by
Implementing a InputFormat although most codes are the same with
TextInputFormat apart from the delimiter. 

This is my first time talking in this mail list and I find you guys are
really nice! Thanks for your discussion with me!



-
Senior in Tsinghua Univ.
github: http://www.github.com/uronce-cc
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10779.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread Sean Owen
Oh, you literally mean these are different lines, not the structure of a line.

You can't solve this in general by reading the entire file into one
string. If the input is tens of gigabytes you will probably exhaust
memory on any of your machines. (Or, you might as well not bother with
Spark then.)

Do you really mean you want the strings that aren't "!!"? that's just
a filter operation. But as I understand you need an RDD of complex
data structures, containing many fields and key-value pairs across
many lines.

This is a difficult format to work with since Hadoop assumes a line is
a record, which is very common, but your records span lines.

If you have many small files, you could use wholeTextFiles to read
entire small text files as a string value, and simply parse it with a
Scala function as normal. That's fine as long as none of the files are
huge.

You can try mapPartitions for larger files, where you can parse an
Iterator[String] instead of a String at a time and combine results
from across lines into an Iterator[YourRecordType]. This would work as
long as Hadoop does not break a file into several partitions, but not
quite if a partition break occurs in your record. If you're willing to
tolerate missing some records here and there, it is a fine scalable
way to do it.


On Mon, Jul 28, 2014 at 12:43 PM, chang cheng  wrote:
> Nop.
>
> My input file's format is:
> !!
> string1
> string2
> !!
> string3
> string4
>
> sc.textFile("path) will return RDD("!!", "string1", "string2", "!!",
> "string3", "string4")
>
> what we need now is to transform this rdd to RDD("string1", "string2",
> "string3", "string4")
>
> your solution may not handle this.
>
>
>
> -
> Senior in Tsinghua Univ.
> github: http://www.github.com/uronce-cc
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10777.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread chang cheng
Nop. 

My input file's format is:
!!
string1
string2
!!
string3
string4

sc.textFile("path) will return RDD("!!", "string1", "string2", "!!",
"string3", "string4")

what we need now is to transform this rdd to RDD("string1", "string2",
"string3", "string4")

your solution may not handle this.



-
Senior in Tsinghua Univ.
github: http://www.github.com/uronce-cc
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10777.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread Sean Owen
Why the call to reduce? that's building up the entire text file as one
string. It seems like you want something more like:

sc.textFile(“path”).map(_.split(“!!”).filter(_.trim.length > 0))

This gives an RDD with an array of non-empty String tokens for each
line of the file.


On Mon, Jul 28, 2014 at 12:32 PM, gmail  wrote:
> Yes, I can implement like:
>
>
> sc.textFile(“path”).reduce(_ + _).split(“!!”).filter(x => x.trim.length > 0)
>
>
> But the reduce operation is expensive! I tested these two methods on a 6G
> file, the only operation with the created RDD is take(10).foreach(println),
> the method using newAPIHadoopFile only takes 2s while the code above will
> block for more than 1min because of the reduce I think.
>
>
> Would you post the code snippet to illustrate your idea? I didn't come up
> with an easy map, filter operation sequences on the RDD returned by
> textFile. Thanks!
>
> 
> 常铖 cheng chang
> Computer Science Dept. Tsinghua Univ.
> Mobile Phone: 13681572414
> WeChat ID: cccjcl
> 
>
> 在 2014年7月28日 下午5:40:21, chang cheng (myai...@gmail.com) 写到:
>
> the value in (key, value) returned by textFile is exactly one line of the
> input.
>
> But what I want is the field between the two “!!”, hope this makes sense.
>
>
>
> -
> Senior in Tsinghua Univ.
> github: http://www.github.com/uronce-cc
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10768.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread gmail
Yes, I can implement like:

sc.textFile(“path”).reduce(_ + _).split(“!!”).filter(x => x.trim.length > 0)

But the reduce operation is expensive! I tested these two methods on a 6G file, 
the only operation with the created RDD is take(10).foreach(println), the 
method using newAPIHadoopFile only takes 2s while the code above will block for 
more than 1min because of the reduce I think.

Would you post the code snippet to illustrate your idea? I didn't come up with 
an easy map, filter operation sequences on the RDD returned by textFile. Thanks!

常铖 cheng chang
Computer Science Dept. Tsinghua Univ.
Mobile Phone: 13681572414
WeChat ID: cccjcl


在 2014年7月28日 下午5:40:21, chang cheng (myai...@gmail.com) 写到:

the value in (key, value) returned by textFile is exactly one line of the  
input.  

But what I want is the field between the two “!!”, hope this makes sense.  



-  
Senior in Tsinghua Univ.  
github: http://www.github.com/uronce-cc  
--  
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.  


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread chang cheng
Yes, I can implement like:

sc.textFile(“path”).reduce(_ + _).split(“!!”).filter(x => x.trim.length > 0)

But the reduce operation is expensive! I tested these two methods on a 6G
file, the only operation with the created RDD is take(10).foreach(println),
the method using newAPIHadoopFile only takes 2s while the code above will
block for more than 1min because of the reduce I think.

Would you post the code snippet to illustrate your idea? I didn't come up
with an easy map, filter operation sequences on the RDD returned by
textFile. Thanks!




-
Senior in Tsinghua Univ.
github: http://www.github.com/uronce-cc
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10773.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


NotSerializableException exception while using TypeTag in Scala 2.10

2014-07-28 Thread Aniket Bhatnagar
I am trying to serialize objects contained in RDDs using runtime relfection
via TypeTag. However, the Spark job keeps
failing java.io.NotSerializableException on an instance of TypeCreator
(auto generated by compiler to enable TypeTags). Is there any workaround
for this without switching to scala 2.11?


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread chang cheng
the value in (key, value) returned by textFile is exactly one line of the
input.

But what I want is the field between the two “!!”, hope this makes sense.



-
Senior in Tsinghua Univ.
github: http://www.github.com/uronce-cc
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764p10768.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread gmail
the value in (key, value) returned by textFile is exactly one line of the input.

But what I want is the field between the two “!!”, hope this makes sense.

常铖 cheng chang
Computer Science Dept. Tsinghua Univ.
Mobile Phone: 13681572414
WeChat ID: cccjcl


在 2014年7月28日 下午5:05:06, Sean Owen (so...@cloudera.com) 写到:

Shouldn't you be using the textFile() method? you are reading the file  
directly using TextInputFormat, and you get the raw (key,value) pairs  
back, which are indeed (line number,line) for TextInputFormat. Your  
second solution is fine if, for some reason, you need to use that  
method.  

On Mon, Jul 28, 2014 at 9:02 AM, chang cheng  wrote:  
> Hi, all:  
>  
> I have a hadoop file containing fields seperated by "!!", like below:  
> !!  
> field1  
> key1 value1  
> key2 value2  
> !!  
> field2  
> key3 value3  
> key4 value4  
> !!  
>  
> I want to read the file into a pair in TextInputFormat, specifying delimiter  
> as "!!"  
>  
> First, I tried the following code:  
>  
> val hadoopConf = new Configuration()  
> hadoopConf.set("textinputformat.record.delimiter", "!!\n")  
>  
> val path = args(0)  
> val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat],
> classOf[LongWritable], classOf[Text], hadoopConf)  
>  
> rdd.take(3).foreach(println)
>  
> Far from expectation, the result is:  
>  
> (120,)  
> (120,)  
> (120,)  
>  
> According to my experimentation, "120" is the byte offset of the last field  
> separated by "!!"  
>  
> After digging into spark source code, I find "textFileInput" is implemented  
> as:  
>  
> hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
> classOf[Text],
> minPartitions).map(pair => pair._2.toString).setName(path)
>  
> So, I modified my initial code into: (bold text is the modification)  
>  
> val hadoopConf = new Configuration()  
> hadoopConf.set("textinputformat.record.delimiter", "!!\n")  
>  
> val path = args(0)  
> val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat],
> classOf[LongWritable], classOf[Text], hadoopConf).*map(pair =>  
> pair._2.toString)*
>  
> rdd.take(3).foreach(println)
>  
> Then, the results are:  
>  
> filed1  
> key1 value1  
> key2 value2  
>  
> field2  
>   
> As expected.  
>  
> I'm confused by the first code snippet's behavior.  
> Hope you can offer an explanation. Thanks!  
>  
>  
>  
> -  
> Senior in Tsinghua Univ.  
> github: http://www.github.com/uronce-cc  
> --  
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764.html
>   
> Sent from the Apache Spark User List mailing list archive at Nabble.com.  


Re: Confusing behavior of newAPIHadoopFile

2014-07-28 Thread Sean Owen
Shouldn't you be using the textFile() method? you are reading the file
directly using TextInputFormat, and you get the raw (key,value) pairs
back, which are indeed (line number,line) for TextInputFormat. Your
second solution is fine if, for some reason, you need to use that
method.

On Mon, Jul 28, 2014 at 9:02 AM, chang cheng  wrote:
> Hi, all:
>
> I have a hadoop file containing fields seperated by "!!", like below:
> !!
> field1
> key1 value1
> key2 value2
> !!
> field2
> key3 value3
> key4 value4
> !!
>
> I want to read the file into a pair in TextInputFormat, specifying delimiter
> as "!!"
>
> First, I tried the following code:
>
> val hadoopConf = new Configuration()
> hadoopConf.set("textinputformat.record.delimiter", "!!\n")
>
> val path = args(0)
> val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat],
>   classOf[LongWritable], classOf[Text], hadoopConf)
>
> rdd.take(3).foreach(println)
>
> Far from expectation, the result is:
>
> (120,)
> (120,)
> (120,)
>
> According to my experimentation, "120" is the byte offset of the last field
> separated by "!!"
>
> After digging into spark source code, I find "textFileInput" is implemented
> as:
>
>  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
> classOf[Text],
>   minPartitions).map(pair => pair._2.toString).setName(path)
>
> So, I modified my initial code into: (bold text is the modification)
>
> val hadoopConf = new Configuration()
> hadoopConf.set("textinputformat.record.delimiter", "!!\n")
>
> val path = args(0)
> val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat],
>   classOf[LongWritable], classOf[Text], hadoopConf).*map(pair =>
> pair._2.toString)*
>
> rdd.take(3).foreach(println)
>
> Then, the results are:
>
> filed1
> key1 value1
> key2 value2
>
> field2
> 
> As expected.
>
> I'm confused by the first code snippet's behavior.
> Hope you can offer an explanation. Thanks!
>
>
>
> -
> Senior in Tsinghua Univ.
> github: http://www.github.com/uronce-cc
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.


[Spark 1.0.1][SparkSQL] reduce stage of shuffle is slow。

2014-07-28 Thread Earthson
I'm using SparkSQL with Hive 0.13, here is the SQL for inserting a partition
with 2048 buckets.

  sqlsc.set("spark.sql.shuffle.partitions", "2048")
  hql("""|insert %s table mz_log
   |PARTITION (date='%s')
   |select * from tmp_mzlog
   |CLUSTER BY mzid
""".stripMargin.format(overwrite, log_date))


env:

yarn-client mode with 80 executor, 2 cores/per executor.

Data:

original text log is about 1.1T.

- - -

the reduce stage is too slow.


 

here is the network usage, it's not the bottle neck. 


 

and the CPU load is very high, why? 


 
here is the configuration(conf/spark-defaults.conf)


spark.ui.port   
spark.akka.frameSize128
spark.akka.timeout  600
spark.akka.threads  8
spark.files.overwrite   true
spark.executor.memory   2G
spark.default.parallelism   32
spark.shuffle.consolidateFiles  true
spark.kryoserializer.buffer.mb  128
spark.storage.blockManagerSlaveTimeoutMs20
spark.serializerorg.apache.spark.serializer.KryoSerializer


2 failed with MapTracker Error.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-1-0-1-SparkSQL-reduce-stage-of-shuffle-is-slow-tp10765.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Confusing behavior of newAPIHadoopFile

2014-07-28 Thread chang cheng
Hi, all:

I have a hadoop file containing fields seperated by "!!", like below:
!!
field1
key1 value1
key2 value2
!!
field2
key3 value3
key4 value4
!!

I want to read the file into a pair in TextInputFormat, specifying delimiter
as "!!"

First, I tried the following code:

val hadoopConf = new Configuration()
hadoopConf.set("textinputformat.record.delimiter", "!!\n")

val path = args(0)
val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat],
  classOf[LongWritable], classOf[Text], hadoopConf)

rdd.take(3).foreach(println)

Far from expectation, the result is:

(120,)
(120,)
(120,)

According to my experimentation, "120" is the byte offset of the last field
separated by "!!"

After digging into spark source code, I find "textFileInput" is implemented
as:

 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable],
classOf[Text],
  minPartitions).map(pair => pair._2.toString).setName(path)

So, I modified my initial code into: (bold text is the modification)

val hadoopConf = new Configuration()
hadoopConf.set("textinputformat.record.delimiter", "!!\n")

val path = args(0)
val rdd = sc.newAPIHadoopFile(path, classOf[TextInputFormat],
  classOf[LongWritable], classOf[Text], hadoopConf).*map(pair =>
pair._2.toString)*

rdd.take(3).foreach(println)

Then, the results are:

filed1
key1 value1
key2 value2

field2

As expected.

I'm confused by the first code snippet's behavior. 
Hope you can offer an explanation. Thanks!



-
Senior in Tsinghua Univ.
github: http://www.github.com/uronce-cc
--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Confusing-behavior-of-newAPIHadoopFile-tp10764.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


VertexPartition and ShippableVertexPartition

2014-07-28 Thread shijiaxin
There is a VertexPartition in the EdgePartition,which is created by
EdgePartitionBuilder.toEdgePartition.
and There is also a ShippableVertexPartition in the VertexRDD.
These two Partitions have a lot of common things like index, data and
Bitset, why is this necessary?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/VertexPartition-and-ShippableVertexPartition-tp10763.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Re: Hadoop Input Format - newAPIHadoopFile

2014-07-28 Thread chang cheng
Here is a tutorial on how to customize your own file format in hadoop:

https://developer.yahoo.com/hadoop/tutorial/module5.html#fileformat

and once you get your own file format, you can use it the same way as
TextInputFormat in spark as you have done in this post.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Hadoop-Input-Format-newAPIHadoopFile-tp2860p10762.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.