Re: Spark Streaming to Kafka

2015-05-19 Thread twinkle sachdeva
Thanks Saisai.

On Wed, May 20, 2015 at 11:23 AM, Saisai Shao 
wrote:

> I think here is the PR https://github.com/apache/spark/pull/2994 you
> could refer to.
>
> 2015-05-20 13:41 GMT+08:00 twinkle sachdeva :
>
>> Hi,
>>
>> As Spark streaming is being nicely integrated with consuming messages
>> from Kafka, so I thought of asking the forum, that is there any
>> implementation available for pushing data to Kafka from Spark Streaming too?
>>
>> Any link(s) will be helpful.
>>
>> Thanks and Regards,
>> Twinkle
>>
>
>


Re: How to implement an Evaluator for a ML pipeline?

2015-05-19 Thread Stefan H.

Thanks, Xiangrui, for clarifying the metric and creating that JIRA issue.

I made an error while composing my earlier mail: 
"paramMap.get(als.regParam)" in my Evaluator actually returns "None". I 
just happended to use "getOrElse(1.0)" in my tests, which explains why 
negating the metric did not change anything.


-Stefan

PS: I got an error while sending my previous mail via the web interface, 
and did not think it got through to the list. So I did not follow up on 
my problem myself. Sorry for the confusion.



Am 19.05.2015 um 21:54 schrieb Xiangrui Meng:

The documentation needs to be updated to state that higher metric
values are better (https://issues.apache.org/jira/browse/SPARK-7740).
I don't know why if you negate the return value of the Evaluator you
still get the highest regularization parameter candidate. Maybe you
should check the log messages from CrossValidator and see the average
metric values during cross validation. -Xiangrui

On Sat, May 9, 2015 at 12:15 PM, Stefan H.  wrote:

Hello everyone,

I am stuck with the (experimental, I think) API for machine learning
pipelines. I have a pipeline with just one estimator (ALS) and I want it to
try different values for the regularization parameter. Therefore I need to
supply an Evaluator that returns a value of type Double. I guess this could
be something like accuracy or mean squared error? The only implementation I
found is BinaryClassificationEvaluator, and I did not understand the
computation there.

I could not find detailed documentation so I implemented a dummy Evaluator
that just returns the regularization parameter:

   new Evaluator {
 def evaluate(dataset: DataFrame, paramMap: ParamMap): Double =
   paramMap.get(als.regParam).getOrElse(throw new Exception)
   }

I just wanted to see whether the lower or higher value "wins". On the
resulting model I inspected the chosen regularization parameter this way:

   cvModel.bestModel.fittingParamMap.get(als.regParam)

And it was the highest of my three regularization parameter candidates.
Strange thing is, if I negate the return value of the Evaluator, that line
still returns the highest regularization parameter candidate.

So I am probably working with false assumptions. I'd be grateful if someone
could point me to some documentation or examples, or has a few hints to
share.

Cheers,
Stefan



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org




-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Hive on Spark VS Spark SQL

2015-05-19 Thread Debasish Das
SparkSQL was built to improve upon Hive on Spark runtime further...

On Tue, May 19, 2015 at 10:37 PM, guoqing0...@yahoo.com.hk <
guoqing0...@yahoo.com.hk> wrote:

> Hive on Spark and SparkSQL which should be better , and what are the key
> characteristics and the advantages and the disadvantages between ?
>
> --
> guoqing0...@yahoo.com.hk
>


java program Get Stuck at broadcasting

2015-05-19 Thread allanjie
​Hi All,
The variable I need to broadcast is just 468 MB.
 
 
When broadcasting, it just “stop” at here:

*
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.tip.id is
deprecated. Instead, use mapreduce.task.id
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.id is
deprecated. Instead, use mapreduce.task.attempt.id
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.is.map is
deprecated. Instead, use mapreduce.task.ismap
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.task.partition is
deprecated. Instead, use mapreduce.task.partition
15/05/20 11:36:14 INFO Configuration.deprecation: mapred.job.id is
deprecated. Instead, use mapreduce.job.id
15/05/20 11:36:14 INFO mapred.FileInputFormat: Total input paths to process
: 1
15/05/20 11:36:14 INFO spark.SparkContext: Starting job: saveAsTextFile at
Test1.java:90
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at
Test1.java:90) with 4 output partitions (allowLocal=false)
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Final stage: Stage
0(saveAsTextFile at Test1.java:90)
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Parents of final stage:
List()
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Missing parents: List()
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting Stage 0
(MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90), which has no
missing parents
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(129264) called
with curMem=988453294, maxMem=2061647216
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2 stored as
values in memory (estimated size 126.2 KB, free 1023.4 MB)
15/05/20 11:36:15 INFO storage.MemoryStore: ensureFreeSpace(78190) called
with curMem=988582558, maxMem=2061647216
15/05/20 11:36:15 INFO storage.MemoryStore: Block broadcast_2_piece0 stored
as bytes in memory (estimated size 76.4 KB, free 1023.3 MB)
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Master:44855 (size: 76.4 KB, free: 1492.4 MB)
15/05/20 11:36:15 INFO storage.BlockManagerMaster: Updated info of block
broadcast_2_piece0
15/05/20 11:36:15 INFO spark.SparkContext: Created broadcast 2 from
broadcast at DAGScheduler.scala:839
15/05/20 11:36:15 INFO scheduler.DAGScheduler: Submitting 4 missing tasks
from Stage 0 (MapPartitionsRDD[3] at saveAsTextFile at Test1.java:90)
15/05/20 11:36:15 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with
4 tasks
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 0.0 in stage
0.0 (TID 0, HadoopV26Slave5, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 1.0 in stage
0.0 (TID 1, HadoopV26Slave3, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 2.0 in stage
0.0 (TID 2, HadoopV26Slave4, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO scheduler.TaskSetManager: Starting task 3.0 in stage
0.0 (TID 3, HadoopV26Slave1, NODE_LOCAL, 1387 bytes)
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave5:45357 (size: 76.4 KB, free: 2.1 GB)
15/05/20 11:36:15 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in
memory on HadoopV26Slave3:57821 (size: 76.4 KB, free: 2.1 GB)  
…….
15/05/20 11:36:28 INFO storage.BlockManagerInfo: Added broadcast_1_piece1 in
memory on HadoopV26Slave5:45357 (size: 4.0 MB, free: 1646.3 MB)
*

And didn’t go forward as I still waiting, basically not stop, but more like
stuck.

I have 6 workers/VMs: each of them has 8GB memory and 12GB disk storage. 
After a few mins pass, the program stopped and showed something like this: 


15/05/20 11:42:45 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0
(TID 1, HadoopV26Slave3):
org.apache.hadoop.ipc.RemoteException(java.io.IOException): File
/user/output/_temporary/0/_temporary/attempt_201505201136__m_01_1/part-1
could only be replicated to 0 nodes instead of minReplication (=1).  There
are 6 datanode(s) running and no node(s) are excluded in this operation.
at
org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.chooseTarget4NewBlock(BlockManager.java:1549)
at
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3200)
at
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:641)
at
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:482)
at
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2039)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2035)

Re: Spark Streaming to Kafka

2015-05-19 Thread Saisai Shao
I think here is the PR https://github.com/apache/spark/pull/2994 you could
refer to.

2015-05-20 13:41 GMT+08:00 twinkle sachdeva :

> Hi,
>
> As Spark streaming is being nicely integrated with consuming messages from
> Kafka, so I thought of asking the forum, that is there any implementation
> available for pushing data to Kafka from Spark Streaming too?
>
> Any link(s) will be helpful.
>
> Thanks and Regards,
> Twinkle
>


Spark Streaming to Kafka

2015-05-19 Thread twinkle sachdeva
Hi,

As Spark streaming is being nicely integrated with consuming messages from
Kafka, so I thought of asking the forum, that is there any implementation
available for pushing data to Kafka from Spark Streaming too?

Any link(s) will be helpful.

Thanks and Regards,
Twinkle


Hive on Spark VS Spark SQL

2015-05-19 Thread guoqing0...@yahoo.com.hk
Hive on Spark and SparkSQL which should be better , and what are the key 
characteristics and the advantages and the disadvantages between ?



guoqing0...@yahoo.com.hk


Re: Spark logo license

2015-05-19 Thread Justin Pihony
Thanks!

On Wed, May 20, 2015 at 12:41 AM, Matei Zaharia 
wrote:

> Check out Apache's trademark guidelines here:
> http://www.apache.org/foundation/marks/
>
> Matei
>
> On May 20, 2015, at 12:02 AM, Justin Pihony 
> wrote:
>
> What is the license on using the spark logo. Is it free to be used for
> displaying commercially?
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>
>


Re: Spark Job not using all nodes in cluster

2015-05-19 Thread ayan guha
What is your spark env file says? Are you setting number of executors in
spark context?
On 20 May 2015 13:16, "Shailesh Birari"  wrote:

> Hi,
>
> I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB
> of RAM.
> I have around 600,000+ Json files on HDFS. Each file is small around 1KB in
> size. Total data is around 16GB. Hadoop block size is 256MB.
> My application reads these files with sc.textFile() (or sc.jsonFile()
> tried
> both) API. But all the files are getting read by only one node (4
> executors). Spark UI shows all 600K+ tasks on one node and 0 on other
> nodes.
>
> I confirmed that all files are accessible from all nodes. Some other
> application which uses big files uses all nodes on same cluster.
>
> Can you please let me know why it is behaving in such way ?
>
> Thanks,
>   Shailesh
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark logo license

2015-05-19 Thread Matei Zaharia
Check out Apache's trademark guidelines here: 
http://www.apache.org/foundation/marks/ 


Matei

> On May 20, 2015, at 12:02 AM, Justin Pihony  wrote:
> 
> What is the license on using the spark logo. Is it free to be used for
> displaying commercially?
> 
> 
> 
> 
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
> 
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
> 



Re: Find KNN in Spark SQL

2015-05-19 Thread Debasish Das
The batch version of this is part of rowSimilarities JIRA 4823 ...if your
query points can fit in memory there is broadcast version which we are
experimenting with internallywe are using brute force KNN right now in
the PR...based on flann paper lsh did not work well but before you go to
approximate knn you have to make sure your topk precision/recall is not
degrading as compared to brute force in your cv flow...

I have not yet extracted knn model but that will use the IndexedRowMatrix
changes that we put in the PR
On May 19, 2015 12:58 PM, "Xiangrui Meng"  wrote:

> Spark SQL doesn't provide spatial features. Large-scale KNN is usually
> combined with locality-sensitive hashing (LSH). This Spark package may
> be helpful: http://spark-packages.org/package/mrsqueeze/spark-hash.
> -Xiangrui
>
> On Sat, May 9, 2015 at 9:25 PM, Dong Li  wrote:
> > Hello experts,
> >
> > I’m new to Spark, and want to find K nearest neighbors on huge scale
> high-dimension points dataset in very short time.
> >
> > The scenario is: the dataset contains more than 10 million points, whose
> dimension is 200d. I’m building a web service, to receive one new point at
> each request and return K nearest points inside that dataset, also need to
> ensure the time-cost not very high. I have a cluster with several
> high-memory nodes for this service.
> >
> > Currently I only have these ideas here:
> > 1. To create several ball-tree instances in each node when service
> initializing. This is fast, but not perform well at data scaling ability. I
> cannot insert new nodes to the ball-trees unless I restart the services and
> rebuild them.
> > 2. To use sql based solution. Some database like PostgreSQL and
> SqlServer have features on spatial search. But these database may not
> perform well in big data environment. (Does SparkSQL have Spatial features
> or spatial index?)
> >
> > Based on your experience, can I achieve this scenario in Spark SQL? Or
> do you know other projects in Spark stack acting well for this?
> > Any ideas are appreciated, thanks very much.
> >
> > Regards,
> > Dong
> >
> >
> >
> >
> > -
> > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> > For additional commands, e-mail: user-h...@spark.apache.org
> >
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Spark logo license

2015-05-19 Thread Justin Pihony
What is the license on using the spark logo. Is it free to be used for
displaying commercially?




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-logo-license-tp22952.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-19 Thread Chandra Mohan, Ananda Vel Murugan
Hi,

Thanks for the response. I was looking for a java solution. I will check the 
scala and python ones.

Regards,
Anand.C

From: Todd Nist [mailto:tsind...@gmail.com]
Sent: Tuesday, May 19, 2015 6:17 PM
To: Chandra Mohan, Ananda Vel Murugan
Cc: ayan guha; user
Subject: Re: Spark sql error while writing Parquet file- Trying to write more 
fields than contained in row

I believe your looking for  df.na.fill in scala, in pySpark Module it is fillna 
(http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

from the docs:

df4.fillna({'age': 50, 'name': 'unknown'}).show()

age height name

10  80 Alice

5   null   Bob

50  null   Tom

50  null   unknown

On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan 
mailto:ananda.muru...@honeywell.com>> wrote:
Hi,

Thanks for the response. But I could not see fillna function in DataFrame class.

[cid:image001.png@01D092DA.4DF87A00]


Is it available in some specific version of Spark sql. This is what I have in 
my pom.xml


  org.apache.spark
  spark-sql_2.10
  1.3.1
   

Regards,
Anand.C

From: ayan guha [mailto:guha.a...@gmail.com]
Sent: Monday, May 18, 2015 5:19 PM
To: Chandra Mohan, Ananda Vel Murugan; user
Subject: Re: Spark sql error while writing Parquet file- Trying to write more 
fields than contained in row

Hi

Give a try with dtaFrame.fillna function to fill up missing column

Best
Ayan

On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan 
mailto:ananda.muru...@honeywell.com>> wrote:
Hi,

I am using spark-sql to read a CSV file and write it as parquet file. I am 
building the schema using the following code.

String schemaString = "a b c";
   List fields = new ArrayList();
   MetadataBuilder mb = new MetadataBuilder();
   mb.putBoolean("nullable", true);
   Metadata m = mb.build();
   for (String fieldName: schemaString.split(" ")) {
fields.add(new StructField(fieldName,DataTypes.DoubleType,true, 
m));
   }
   StructType schema = DataTypes.createStructType(fields);

Some of the rows in my input csv does not contain three columns. After building 
my JavaRDD, I create data frame as shown below using the RDD and schema.

DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);

Finally I try to save it as Parquet file

darDataFrame.saveAsParquetFile("/home/anand/output.parquet”)

I get this error when saving it as Parquet file

java.lang.IndexOutOfBoundsException: Trying to write more fields than contained 
in row (3 > 2)

I understand the reason behind this error. Some of my rows in Row RDD does not 
contain three elements as some rows in my input csv does not contain three 
columns. But while building the schema, I am specifying every field as 
nullable. So I believe, it should not throw this error. Can anyone help me fix 
this error. Thank you.

Regards,
Anand.C





--
Best Regards,
Ayan Guha



Spark Job not using all nodes in cluster

2015-05-19 Thread Shailesh Birari
Hi,

I have a 4 node Spark 1.3.1 cluster. All four nodes have 4 cores and 64 GB
of RAM.
I have around 600,000+ Json files on HDFS. Each file is small around 1KB in
size. Total data is around 16GB. Hadoop block size is 256MB.
My application reads these files with sc.textFile() (or sc.jsonFile()  tried
both) API. But all the files are getting read by only one node (4
executors). Spark UI shows all 600K+ tasks on one node and 0 on other nodes.

I confirmed that all files are accessible from all nodes. Some other
application which uses big files uses all nodes on same cluster.

Can you please let me know why it is behaving in such way ?

Thanks,
  Shailesh




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Job-not-using-all-nodes-in-cluster-tp22951.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



?????? How to use spark to access HBase with Security enabled

2015-05-19 Thread donhoff_h
Sorry, this ref does not help me.  I have set up the configuration in 
hbase-site.xml. But it seems there are still some extra configurations to be 
set or APIs to be called to make my spark program be able to pass the 
authentication with the HBase.

Does anybody know how to set authentication to a secured HBase in a spark 
program which use the API "newAPIHadoopRDD" to get information from HBase?

Many Thanks!



--  --
??: "yuzhihong";;
: 2015??5??19??(??) 9:54
??: "donhoff_h"<165612...@qq.com>; 
: "user"; 
: Re: How to use spark to access HBase with Security enabled



Please take a look at:
http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation



Cheers


On Tue, May 19, 2015 at 5:23 AM, donhoff_h <165612...@qq.com> wrote:


The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark 
programs. I am sure I have run the kinit command to make it take effect. And I 
also used the HBase Shell to verify that this user has the right to scan and 
put the tables in HBase.


Now I still have no idea how to solve this problem. Can anybody help me to 
figure it out? Many Thanks!


--  --
??: "yuzhihong";;
: 2015??5??19??(??) 7:55
??: "donhoff_h"<165612...@qq.com>; 
: "user"; 
: Re: How to use spark to access HBase with Security enabled



Which user did you run your program as ?


Have you granted proper permission on hbase side ?


You should also check master log to see if there was some clue. 


Cheers




On May 19, 2015, at 2:41 AM, donhoff_h <165612...@qq.com> wrote:


Hi, experts.

I ran the "HBaseTest" program which is an example from the Apache Spark source 
code to learn how to use spark to access HBase. But I met the following 
exception:
Exception in thread "main" 
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
attempts=36, exceptions:
Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 
'hbase:meta' at region=hbase:meta,,1.1588230740, 
hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0

I also checked the RegionServer Log of the host "bgdt01.dev.hrb" listed in the 
above exception. I found a few entries like the following one:
2015-05-19 16:59:11,143 DEBUG 
[RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
RpcServer.listener,port=16020: Caught exception while reading:Authentication is 
required 

The above entry did not point to my program clearly. But the time is very near. 
Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the 
exception was caused by the Kerberos authentication.  But I am not sure.

Do anybody know if my guess is right? And if I am right, could anybody tell me 
how to set Kerberos Authentication in a spark program? I don't know how to do 
it. I already checked the API doc , but did not found any API useful. Many 
Thanks!

By the way, my spark version is 1.3.0. I also paste the code of "HBaseTest" in 
the following:
***Source Code**
object HBaseTest {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseTest")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, args(0))

// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
  val tableDesc = new HTableDescriptor(args(0))
  admin.createTable(tableDesc)
}


val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])


hBaseRDD.count()


sc.stop()
  }
}

Re: spark 1.3.1 jars in repo1.maven.org

2015-05-19 Thread Ted Yu
I think your observation is correct.
e.g.
http://mvnrepository.com/artifact/org.apache.spark/spark-core_2.10/1.3.1
shows that it depends on hadoop-client
 from
hadoop 2.2

Cheers

On Tue, May 19, 2015 at 6:17 PM, Edward Sargisson  wrote:

> Hi,
> I'd like to confirm an observation I've just made. Specifically that spark
> is only available in repo1.maven.org for one Hadoop variant.
>
> The Spark source can be compiled against a number of different Hadoops
> using profiles. Yay.
> However, the spark jars in repo1.maven.org appear to be compiled against
> one specific Hadoop and no other differentiation is made. (I can see a
> difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
> the version I compiled locally).
>
> The implication here is that if you have a pom file asking for
> spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
> version. Maven assumes that non-snapshot artifacts never change so trying
> to load an Hadoop 1 version will end in tears.
>
> This then means that if you compile code against spark-core then there
> will probably be classpath NoClassDefFound issues unless the Hadoop 2
> version is exactly the one you want.
>
> Have I gotten this correct?
>
> It happens that our little app is using a Spark context directly from a
> Jetty webapp and the classpath differences were/are causing some confusion.
> We are currently installing a Hadoop 1 spark master and worker.
>
> Thanks a lot!
> Edward
>


Re: Reading Binary files in Spark program

2015-05-19 Thread Tapan Sharma
Problem is still there.
Exception is not coming at the time of reading.
Also the count of JavaPairRDD is as expected. It is when we are calling
collect() or toArray() methods, the exception is coming.
Something to do with Text class even though I haven't used it in the
program.

Regards
Tapan

On Tue, May 19, 2015 at 6:26 PM, Akhil Das 
wrote:

> Try something like:
>
> JavaPairRDD output = sc.newAPIHadoopFile(inputDir,
>   org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
> IntWritable.class,
>   Text.class, new Job().getConfiguration());
>
> With the type of input format that you require.
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma 
> wrote:
>
>> Hi Team,
>>
>> I am new to Spark and learning.
>> I am trying to read image files into spark job. This is how I am doing:
>> Step 1. Created sequence files with FileName as Key and Binary image as
>> value. i.e.  Text and BytesWritable.
>> I am able to read these sequence files into Map Reduce programs.
>>
>> Step 2.
>> I understand that Text and BytesWritable are Non Serializable therefore, I
>> read the sequence file in Spark as following:
>>
>> SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
>> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
>> JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
>> String.class, Byte.class) ;
>> final List> tuple2s = seqFiles.collect();
>>
>>
>>
>>
>> The moment I try to call collect() method to get the keys of sequence
>> file,
>> following exception has been thrown
>>
>> Can any one help me understanding why collect() method is failing? If I
>> use
>> toArray() on seqFiles object then also I am getting same call stack.
>>
>> Regards
>> Tapan
>>
>>
>>
>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>> at
>>
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>> at
>>
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
>> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
>> 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
>> retrying
>> 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>> 0.0, whose tasks have all completed, from pool
>> 2015-05-19 15:15:03,739 INFO
>> [sparkDriver-akka.actor.default-dispatcher-2]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
>> stage 0
>> 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 0 failed: collect at
>> JavaSequenceFile.java:44, took 4.421397 s
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due
>> to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
>> result: org.apache.hadoop.io.Text
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at scala.Option.foreach(Option.scala:236)
>> at
>>
>> or

spark 1.3.1 jars in repo1.maven.org

2015-05-19 Thread Edward Sargisson
Hi,
I'd like to confirm an observation I've just made. Specifically that spark
is only available in repo1.maven.org for one Hadoop variant.

The Spark source can be compiled against a number of different Hadoops
using profiles. Yay.
However, the spark jars in repo1.maven.org appear to be compiled against
one specific Hadoop and no other differentiation is made. (I can see a
difference with hadoop-client being 2.2.0 in repo1.maven.org and 1.0.4 in
the version I compiled locally).

The implication here is that if you have a pom file asking for
spark-core_2.10 version 1.3.1 then Maven will only give you an Hadoop 2
version. Maven assumes that non-snapshot artifacts never change so trying
to load an Hadoop 1 version will end in tears.

This then means that if you compile code against spark-core then there will
probably be classpath NoClassDefFound issues unless the Hadoop 2 version is
exactly the one you want.

Have I gotten this correct?

It happens that our little app is using a Spark context directly from a
Jetty webapp and the classpath differences were/are causing some confusion.
We are currently installing a Hadoop 1 spark master and worker.

Thanks a lot!
Edward


Re: EOFException using KryoSerializer

2015-05-19 Thread Imran Rashid
Hi Jim,

this is definitley strange.  It sure sounds like a bug, but it also is a
very commonly used code path, so it at the very least you must be hitting a
corner case.  Could you share a little more info with us?  What version of
spark are you using?  How big is the object you are trying to broadcast?
Can you share more of the logs from before the exception?

It is not too surprising this shows up in mesos but not in local mode.
Local mode never exercises the part of the code that needs to deserialize
the blocks of a broadcast variables (though it actually does serialize the
data into blocks).  So I doubt its mesos specific, more likely it would
happen in any cluster mode -- yarn, standalone, or even local-cluster (a
pseudo-cluster just for testing).

Imran

On Tue, May 19, 2015 at 3:56 PM, Jim Carroll  wrote:

> I'm seeing the following exception ONLY when I run on a Mesos cluster. If I
> run the exact same code with master set to "local[N]" I have no problem:
>
>  2015-05-19 16:45:43,484 [task-result-getter-0] WARN  TaskSetManager - Lost
> task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException
> at
>
> org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
> at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
> at
>
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
> at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> at java.lang.Thread.run(Thread.java:745)
>
> KryoSerializer explicitly throws an EOFException. The comment says:
>
> // DeserializationStream uses the EOF exception to indicate stopping
> condition.
>
> Apparently this isn't what TorrentBroadcast expects.
>
> Any suggestions? Thanks.
>
> Jim
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Reading Binary files in Spark program

2015-05-19 Thread Tapan Sharma
Thanks. I will try and let you know. But what exactly is an issue? Any
pointers?

Regards
Tapan

On Tue, May 19, 2015 at 6:26 PM, Akhil Das 
wrote:

> Try something like:
>
> JavaPairRDD output = sc.newAPIHadoopFile(inputDir,
>   org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
> IntWritable.class,
>   Text.class, new Job().getConfiguration());
>
> With the type of input format that you require.
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma 
> wrote:
>
>> Hi Team,
>>
>> I am new to Spark and learning.
>> I am trying to read image files into spark job. This is how I am doing:
>> Step 1. Created sequence files with FileName as Key and Binary image as
>> value. i.e.  Text and BytesWritable.
>> I am able to read these sequence files into Map Reduce programs.
>>
>> Step 2.
>> I understand that Text and BytesWritable are Non Serializable therefore, I
>> read the sequence file in Spark as following:
>>
>> SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
>> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
>> JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
>> String.class, Byte.class) ;
>> final List> tuple2s = seqFiles.collect();
>>
>>
>>
>>
>> The moment I try to call collect() method to get the keys of sequence
>> file,
>> following exception has been thrown
>>
>> Can any one help me understanding why collect() method is failing? If I
>> use
>> toArray() on seqFiles object then also I am getting same call stack.
>>
>> Regards
>> Tapan
>>
>>
>>
>> java.io.NotSerializableException: org.apache.hadoop.io.Text
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>> at
>>
>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>> at
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>> at
>>
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>> at
>> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
>> at
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
>> at
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>> at
>>
>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>> at
>>
>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>> at
>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>> at java.lang.Thread.run(Thread.java:745)
>> 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
>> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
>> 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
>> retrying
>> 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
>> 0.0, whose tasks have all completed, from pool
>> 2015-05-19 15:15:03,739 INFO
>> [sparkDriver-akka.actor.default-dispatcher-2]
>> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling
>> stage 0
>> 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
>> (Logging.scala:logInfo(59)) - Job 0 failed: collect at
>> JavaSequenceFile.java:44, took 4.421397 s
>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>> due
>> to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
>> result: org.apache.hadoop.io.Text
>> at
>> org.apache.spark.scheduler.DAGScheduler.org
>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>> at
>>
>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>> at
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>> at scala.Option.foreach(Option.scala:236)
>> at
>>
>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>> at
>>
>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGSched

Spark users

2015-05-19 Thread Ricardo Goncalves da Silva
Hi
I'm learning spark focused on data and machine learning. Migrating from SAS.

There is a group for it? My questions are basic for now and I having very few 
answers.

Tal

Rick.



Enviado do meu smartphone Samsung Galaxy.



Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


sparkSQL - Hive metastore connection hangs with MS SQL server

2015-05-19 Thread jamborta
Hi all,

I am trying to setup an external metastore using Microsoft SQL on Azure, it
works ok initially but after about 5 mins inactivity it hangs, then times
out after 15 mins with this error:

15/05/20 00:02:49 ERROR ConnectionHandle: Database access problem. Killing
off this connection and all remaining connections in the connection pool.
SQL State = 08S01
15/05/20 00:02:49 ERROR RetryingHMSHandler: Retrying HMSHandler after 1000
ms (attempt 1 of 1) with error: javax.jdo.JDODataStoreException: SQL Server
did not return a 
response. The connection has been closed.
at
org.datanucleus.api.jdo.NucleusJDOHelper.getJDOExceptionForNucleusException(NucleusJDOHelper.java:451)
at org.datanucleus.api.jdo.JDOQuery.execute(JDOQuery.java:275)
at
org.apache.hadoop.hive.metastore.ObjectStore.getMTable(ObjectStore.java:901)
at
org.apache.hadoop.hive.metastore.ObjectStore.getTable(ObjectStore.java:833)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
NestedThrowablesStackTrace:
com.microsoft.sqlserver.jdbc.SQLServerException: SQL Server did not return a
response. The connection has been closed.
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1668)
at
com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:1655)
at
com.microsoft.sqlserver.jdbc.TDSReader.readPacket(IOBuffer.java:4844)

I have also tried replacing BoneCP with DBCP in
datanucleus.connectionPoolingType, that didn't help either. 




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/sparkSQL-Hive-metastore-connection-hangs-with-MS-SQL-server-tp22950.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Naming an DF aggregated column

2015-05-19 Thread Michael Armbrust
customerDF.groupBy("state").agg(max($"discount").alias("newName"))

(or .as(...), both functions can take a String or a Symbol)

On Tue, May 19, 2015 at 2:11 PM, Cesar Flores  wrote:

>
> I would like to ask if there is a way of specifying the column name of a
> data frame aggregation. For example If I do:
>
> customerDF.groupBy("state").agg(max($"discount"))
>
> the name of my aggregated column will be: MAX('discount)
>
> Is there a way of changing the name of that column to something else on
> the fly, and not after performing the aggregation?
>
>
> thanks
> --
> Cesar Flores
>


Re: Spark SQL on large number of columns

2015-05-19 Thread ayan guha
One option is batch up columns and do the batches in sequence.
On 20 May 2015 00:20, "madhu phatak"  wrote:

> Hi,
> Another update, when run on more that 1000 columns I am getting
>
> Could not write class
> __wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1
> because it exceeds JVM code size limits. Method apply's code too large!
>
>
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Tue, May 19, 2015 at 6:23 PM, madhu phatak 
> wrote:
>
>> Hi,
>> Tested with HiveContext also. It also take similar amount of time.
>>
>> To make the things clear, the following is select clause for a given
>> column
>>
>>
>> *aggregateStats( "$columnName" , max( cast($columnName as double)),   
>> |min(cast($columnName as double)), avg(cast($columnName as double)), 
>> count(*) )*
>>
>> aggregateStats is UDF generating case class to hold the values.
>>
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>> On Tue, May 19, 2015 at 5:57 PM, madhu phatak 
>> wrote:
>>
>>> Hi,
>>> Tested for calculating values for 300 columns. Analyser takes around 4
>>> minutes to generate the plan. Is this normal?
>>>
>>>
>>>
>>>
>>> Regards,
>>> Madhukara Phatak
>>> http://datamantra.io/
>>>
>>> On Tue, May 19, 2015 at 4:35 PM, madhu phatak 
>>> wrote:
>>>
 Hi,
 I am using spark 1.3.1




 Regards,
 Madhukara Phatak
 http://datamantra.io/

 On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) 
 wrote:

>  And which version are you using
>
> 发自我的 iPhone
>
> 在 2015年5月19日,18:29,"ayan guha"  写道:
>
>   can you kindly share your code?
>
> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
> wrote:
>
>> Hi,
>> I  am trying run spark sql aggregation on a file with 26k columns. No
>> of rows is very small. I am running into issue that spark is taking huge
>> amount of time to parse the sql and create a logical plan. Even if i have
>> just one row, it's taking more than 1 hour just to get pass the parsing.
>> Any idea how to optimize in these kind of scenarios?
>>
>>
>>  Regards,
>>  Madhukara Phatak
>> http://datamantra.io/
>>
>
>
>
>  --
> Best Regards,
> Ayan Guha
>
>

>>>
>>
>


Hive 1.0 support in Spark

2015-05-19 Thread Kannan Rajah
Does Spark 1.3.1 support Hive 1.0? If not, which version of Spark will
start supporting Hive 1.0?

--
Kannan


Re: Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-19 Thread Davies Liu
It surprises me, could you list the owner information of
/mnt/lustre/bigdata/med_home/tmp/test19EE/ ?

On Tue, May 19, 2015 at 8:15 AM, Tomasz Fruboes
 wrote:
> Dear Experts,
>
>  we have a spark cluster (standalone mode) in which master and workers are
> started from root account. Everything runs correctly to the point when we
> try doing operations such as
>
> dataFrame.select("name", "age").save(ofile, "parquet")
>
> or
>
> rdd.saveAsPickleFile(ofile)
>
> , where ofile is path on a network exported filesystem (visible on all
> nodes, in our case this is lustre, I guess on nfs effect would be similar).
>
>  Unsurprisingly temp files created on workers are owned by root, which then
> leads to a crash (see [1] below). Is there a solution/workaround for this
> (e.g. controlling file creation mode of the temporary files)?
>
> Cheers,
>  Tomasz
>
>
> ps I've tried to google this problem, couple of similar reports, but no
> clear answer/solution found
>
> ps2 For completeness - running master/workers as a regular user solves the
> problem only for the given user. For other users submitting to this master
> the result is given in [2] below
>
>
> [0] Cluster details:
> Master/workers: centos 6.5
> Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build)
>
>
> [1]
> ##
>File
> "/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py",
> line 300, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o27.save.
> : java.io.IOException: Failed to rename
> DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet;
> isDirectory=false; length=534; replication=1; blocksize=33554432;
> modification_time=1432042832000; access_time=0; owner=; group=;
> permission=rw-rw-rw-; isSymlink=false} to
> file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
> at
> org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
> at
> parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43)
> at
> org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690)
> at
> org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
> at
> org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
> at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:606)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
> at
> py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:259)
> at
> py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
> at py4j.commands.CallCommand.execute(CallCommand.java:79)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> ##
>
>
>
> [2]
> ##
> 15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 3,
> wn23023.cis.gov.pl): java.io.IOException: Mkdirs failed to create
> file:/mnt/lustre/bigdata/med_home/tmp/test18/namesAndAges.parquet2/_temporary/0/_temporary/attempt_201505191445_0009_r_00_0
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
> at
> org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
> at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
> at
> parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:154)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:279)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
> at
> org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667)
> at
> org

Re: Does Python 2.7 have to be installed on every cluster node?

2015-05-19 Thread Davies Liu
PySpark work with CPython by default, and you can specify which
version of Python to use by:

PYSPARK_PYTHON=path/to/path bin/spark-submit  xxx.py

When you do the upgrade, you could install python 2.7 on every machine
in the cluster, test it with

PYSPARK_PYTHON=python2.7 bin/spark-submit  xxx.py

For YARN, you also need to install python2.7 in every node in the cluster.

On Tue, May 19, 2015 at 7:44 AM, YaoPau  wrote:
> We're running Python 2.6.6 here but we're looking to upgrade to 2.7.x  in a
> month.
>
> Does pyspark work by converting Python into Java Bytecode, or does it run
> Python natively?
>
> And along those lines, if we're running in yarn-client mode, would we have
> to upgrade just the edge node version of Python, or every node in the
> cluster?
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Does-Python-2-7-have-to-be-installed-on-every-cluster-node-tp22945.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



How to set the file size for parquet Part

2015-05-19 Thread Richard Grossman
Hi

I'm using spark 1.3.1 and now I can't set the size of the part generated
file for parquet.
The size is only 512Kb it's really to small I must made them bigger.
How can set this ?
Thanks


Naming an DF aggregated column

2015-05-19 Thread Cesar Flores
I would like to ask if there is a way of specifying the column name of a
data frame aggregation. For example If I do:

customerDF.groupBy("state").agg(max($"discount"))

the name of my aggregated column will be: MAX('discount)

Is there a way of changing the name of that column to something else on the
fly, and not after performing the aggregation?


thanks
-- 
Cesar Flores


EOFException using KryoSerializer

2015-05-19 Thread Jim Carroll
I'm seeing the following exception ONLY when I run on a Mesos cluster. If I
run the exact same code with master set to "local[N]" I have no problem:

 2015-05-19 16:45:43,484 [task-result-getter-0] WARN  TaskSetManager - Lost
task 0.0 in stage 0.0 (TID 0, 10.253.1.101): java.io.EOFException
at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:142)
at
org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:216)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:177)
at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1153)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:87)
at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:58)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

KryoSerializer explicitly throws an EOFException. The comment says: 

// DeserializationStream uses the EOF exception to indicate stopping
condition.

Apparently this isn't what TorrentBroadcast expects.

Any suggestions? Thanks.

Jim





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/EOFException-using-KryoSerializer-tp22948.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Exception when using CLUSTER BY or ORDER BY

2015-05-19 Thread Thomas Dudziak
Under certain circumstances that I haven't yet been able to isolate, I get
the following error when doing a HQL query using HiveContext (Spark 1.3.1
on Mesos, fine-grained mode). Is this a known problem or should I file a
JIRA for it ?


org.apache.spark.SparkException: Can only zip RDDs with same number of
elements in each partition
at 
org.apache.spark.rdd.RDD$$anonfun$zip$1$$anon$1.hasNext(RDD.scala:746)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at 
org.apache.spark.util.random.SamplingUtils$.reservoirSampleAndCount(SamplingUtils.scala:56)
at 
org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:259)
at 
org.apache.spark.RangePartitioner$$anonfun$8.apply(Partitioner.scala:257)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at org.apache.spark.rdd.RDD$$anonfun$15.apply(RDD.scala:647)
at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)


Re: rdd.sample() methods very slow

2015-05-19 Thread Sean Owen
The way these files are accessed is inherently sequential-access. There
isn't a way to in general know where record N is in a file like this and
jump to it. So they must be read to be sampled.


On Tue, May 19, 2015 at 9:44 PM, Wang, Ningjun (LNG-NPV) <
ningjun.w...@lexisnexis.com> wrote:

>  Hi
>
>
>
> I have an RDD[Document] that contains 7 million objects and it is saved in
> file system as object file. I want to get a random sample of about 70
> objects from it using rdd.sample() method. It is ver slow
>
>
>
>
>
> val rdd : RDD[Document] =
> sc.objectFile[Document]("C:/temp/docs.obj").sample(false, 0.1D,
> 0L).cache()
>
> val count = rdd.count()
>
>
>
> From Spark UI, I see spark is try to read the entire object files at the
> folder “C:/temp/docs.obj” which is about 29.7 GB. Of course this is very
> slow. Why does Spark try to read entire 7 million objects while I only need
> to return a random sample of 70 objects?
>
>
>
> Is there any efficient way to get a random sample of 70 objects without
> reading through the entire object files?
>
>
>
> Ningjun
>
>
>


Spark 1.3 classPath problem

2015-05-19 Thread Bill Q
Hi,
We have some Spark job that ran well under Spark 1.2 using spark-submit
--conf "spark.executor.extraClassPath=/etc/hbase/conf" and the Java HBase
driver code the Spark called can pick up the settings for HBase such as
ZooKeeper addresses.

But after upgrade to CDH 5.4.1 Spark 1.3, the Spark code stopped to pass
the conf to the Java HBase drivers.

Any suggestions that we can properly include the classPath?

Many thanks.


Bill



-- 
Many thanks.


Bill


rdd.sample() methods very slow

2015-05-19 Thread Wang, Ningjun (LNG-NPV)
Hi

I have an RDD[Document] that contains 7 million objects and it is saved in file 
system as object file. I want to get a random sample of about 70 objects from 
it using rdd.sample() method. It is ver slow


val rdd : RDD[Document] = 
sc.objectFile[Document]("C:/temp/docs.obj").sample(false, 0.1D, 0L).cache()
val count = rdd.count()

>From Spark UI, I see spark is try to read the entire object files at the 
>folder "C:/temp/docs.obj" which is about 29.7 GB. Of course this is very slow. 
>Why does Spark try to read entire 7 million objects while I only need to 
>return a random sample of 70 objects?

Is there any efficient way to get a random sample of 70 objects without reading 
through the entire object files?

Ningjun



Re: LogisticRegressionWithLBFGS with large feature set

2015-05-19 Thread Xiangrui Meng
For ML applications, the best setting to set the number of partitions
to match the number of cores to reduce shuffle size. You have 3072
partitions but 128 executors, which causes the overhead. For the
MultivariateOnlineSummarizer, we plan to add flags to specify what
need to be computed to reduce the overhead, in 1.5. -Xiangrui

On Mon, May 18, 2015 at 7:00 PM, Imran Rashid  wrote:
> I'm not super familiar with this part of the code, but from taking a quick
> look:
>
> a) the code creates a MultivariateOnlineSummarizer, which stores 7 doubles
> per feature (mean, max, min, etc. etc.)
> b) The limit is on the result size from *all* tasks, not from one task.  You
> start with 3072 tasks
> c) tree aggregate should first merge things down to about 8 partitions
> before bringing results back to the driver, which is how you end up with 54
> tasks at your failure.
>
> this means you should have about 30 MB / per task per meaure * 54 tasks * 7
> measures, which comes to about 11GB, or in the ballpark of what you found.
>
> In principle, you could get this working by adding more levels to the
> treeAggregate (the depth parameter), but looks like that isn't exposed.  You
> could also try coalescing your data down to a smaller set of partitions
> first, but that comes with other downsides.
>
> Perhaps an MLLib expert could chime in on an alternate approach.  My feeling
> (from a very quick look) is that there is room for some optimization in the
> internals
>
> Imran
>
> On Thu, May 14, 2015 at 5:44 PM, Pala M Muthaia
>  wrote:
>>
>> Hi,
>>
>> I am trying to validate our modeling data pipeline by running
>> LogisticRegressionWithLBFGS on a dataset with ~3.7 million features,
>> basically to compute AUC. This is on Spark 1.3.0.
>>
>> I am using 128 executors with 4 GB each + driver with 8 GB. The number of
>> data partitions is 3072
>>
>> The execution fails with the following messages:
>>
>> Total size of serialized results of 54 tasks (10.4 GB) is bigger than
>> spark.driver.maxResultSize (3.0 GB)
>>
>> The associated stage in the job is treeAggregate at
>> StandardScaler.scala:52 : The call stack looks as below:
>>
>> org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:996)
>> org.apache.spark.mllib.feature.StandardScaler.fit(StandardScaler.scala:52)
>>
>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:233)
>>
>> org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.run(GeneralizedLinearAlgorithm.scala:190)
>>
>>
>> I am trying to both understand why such large amount of data needs to be
>> passed back to driver as well as figure out a way around this. I also want
>> to understand how much memory is required, as a function of dataset size,
>> feature set size, and number of iterations performed, for future
>> experiments.
>>
>> From looking at the MLLib code, the largest data structure seems to be a
>> dense vector of the same size as feature set. I am not familiar with
>> algorithm or its implementation I would guess 3.7 million features would
>> lead to a constant multiple of ~3.7 * 8 ~ 30 MB. So how does the dataset
>> size become so large?
>>
>> I looked into the treeAggregate and it looks like hierarchical
>> aggregation. If the data being sent to the driver is basically the
>> aggregated coefficients (i.e. dense vectors) for the final aggregation,
>> can't the dense vectors from executors be pulled in one at a time and merged
>> in memory, rather than pulling all of them in together? (This is totally
>> uneducated guess so i may be completely off here).
>>
>> Is there a way to get this running?
>>
>> Thanks,
>> pala
>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: k-means core function for temporal geo data

2015-05-19 Thread Xiangrui Meng
I'm not sure whether k-means would converge with this customized
distance measure. You can list (weighted) time as a feature along with
coordinates, and then use Euclidean distance. For other supported
distance measures, you can check Derrick's package:
http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering.
-Xiangrui

On Mon, May 18, 2015 at 2:30 AM, Pa Rö  wrote:
> hallo,
>
> i want cluster geo data (lat,long,timestamp) with k-means. now i search for
> a good core function, i can not find good paper or other sources for that.
> to time i multiplicate the time and the space distance:
>
> public static double dis(GeoData input1, GeoData input2)
> {
>double timeDis = Math.abs( input1.getTime() - input2.getTime() );
>double geoDis = geoDis(input1, input2); //extra function
>return timeDis*geoDis;
> }
>
> maybe someone know a good core function for clustering temporal geo data?
> (need citation)

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Word2Vec with billion-word corpora

2015-05-19 Thread nate
Might also want to look at Y! post, looks like they are experimenting with 
similar efforts in large scale word2vec:

http://yahooeng.tumblr.com/post/118860853846/distributed-word2vec-on-top-of-pistachio



-Original Message-
From: Xiangrui Meng [mailto:men...@gmail.com] 
Sent: Tuesday, May 19, 2015 1:25 PM
To: Shilad Sen
Cc: user
Subject: Re: Word2Vec with billion-word corpora

With vocabulary size 4M and 400 vector size, you need 400 * 4M = 16B floats to 
store the model. That is 64GB. We store the model on the driver node in the 
current implementation. So I don't think it would work. You might try 
increasing the minCount to decrease the vocabulary size and reduce the vector 
size. I'm interested in learning the trade-off between the model size and the 
model quality. If you have done some experiments, please let me know. Thanks! 
-Xiangrui

On Wed, May 13, 2015 at 11:17 AM, Shilad Sen  wrote:
> Hi all,
>
> I'm experimenting with Spark's Word2Vec implementation for a 
> relatively large (5B word, vocabulary size 4M, 400-dimensional 
> vectors) corpora. Has anybody had success running it at this scale?
>
> Thanks in advance for your guidance!
>
> -Shilad
>
> --
> Shilad W. Sen
> Associate Professor
> Mathematics, Statistics, and Computer Science Dept.
> Macalester College
> s...@macalester.edu
> http://www.shilad.com
> https://www.linkedin.com/in/shilad
> 651-696-6273

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional 
commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
If you checkpoint, the job will start from the successfully consumed
offsets.  If you don't checkpoint, by default it will start from the
highest available offset, and you will potentially lose data.

Is the link I posted, or for that matter the scaladoc, really not clear on
that point?

The scaladoc says:

 To recover from driver failures, you have to enable checkpointing in the
StreamingContext
.
The information on consumed offset can be recovered from the checkpoint.

On Tue, May 19, 2015 at 2:38 PM, Bill Jay 
wrote:

> If a Spark streaming job stops at 12:01 and I resume the job at 12:02.
> Will it still start to consume the data that were produced to Kafka at
> 12:01? Or it will just start consuming from the current time?
>
>
> On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger 
> wrote:
>
>> Have you read
>> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ?
>>
>> 1.  There's nothing preventing that.
>>
>> 2. Checkpointing will give you at-least-once semantics, provided you have
>> sufficient kafka retention.  Be aware that checkpoints aren't recoverable
>> if you upgrade code.
>>
>> On Tue, May 19, 2015 at 12:42 PM, Bill Jay 
>> wrote:
>>
>>> Hi all,
>>>
>>> I am currently using Spark streaming to consume and save logs every hour
>>> in our production pipeline. The current setting is to run a crontab job to
>>> check every minute whether the job is still there and if not resubmit a
>>> Spark streaming job. I am currently using the direct approach for Kafka
>>> consumer. I have two questions:
>>>
>>> 1. In the direct approach, no offset is stored in zookeeper and no group
>>> id is specified. Can two consumers (one is Spark streaming and the other is
>>> a Kafak console consumer in Kafka package) read from the same topic from
>>> the brokers together (I would like both of them to get all messages, i.e.
>>> publish-subscribe mode)? What about two Spark streaming jobs read from the
>>> same topic?
>>>
>>> 2. How to avoid data loss if a Spark job is killed? Does checkpointing
>>> serve this purpose? The default behavior of Spark streaming is to read the
>>> latest logs. However, if a job is killed, can the new job resume from what
>>> was left to avoid loosing logs?
>>>
>>> Thanks!
>>>
>>> Bill
>>>
>>
>>
>


Re: Word2Vec with billion-word corpora

2015-05-19 Thread Xiangrui Meng
With vocabulary size 4M and 400 vector size, you need 400 * 4M = 16B
floats to store the model. That is 64GB. We store the model on the
driver node in the current implementation. So I don't think it would
work. You might try increasing the minCount to decrease the vocabulary
size and reduce the vector size. I'm interested in learning the
trade-off between the model size and the model quality. If you have
done some experiments, please let me know. Thanks! -Xiangrui

On Wed, May 13, 2015 at 11:17 AM, Shilad Sen  wrote:
> Hi all,
>
> I'm experimenting with Spark's Word2Vec implementation for a relatively
> large (5B word, vocabulary size 4M, 400-dimensional vectors) corpora. Has
> anybody had success running it at this scale?
>
> Thanks in advance for your guidance!
>
> -Shilad
>
> --
> Shilad W. Sen
> Associate Professor
> Mathematics, Statistics, and Computer Science Dept.
> Macalester College
> s...@macalester.edu
> http://www.shilad.com
> https://www.linkedin.com/in/shilad
> 651-696-6273

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Increase maximum amount of columns for covariance matrix for principal components

2015-05-19 Thread Xiangrui Meng
We use a dense array to store the covariance matrix on the driver
node. So its length is limited by the integer range, which is 65536 *
65536 (actually half). -Xiangrui

On Wed, May 13, 2015 at 1:57 AM, Sebastian Alfers
 wrote:
> Hello,
>
>
> in order to compute a huge dataset, the amount of columns to calculate the
> covariance matrix is limited:
>
> https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala#L129
>
> What is the reason behind this limitation and can it be extended?
>
> Greetings
>
> Sebastian

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: question about customize kmeans distance measure

2015-05-19 Thread Xiangrui Meng
MLlib only supports Euclidean distance for k-means. You can find
Bregman divergence support in Derrick's package:
http://spark-packages.org/package/derrickburns/generalized-kmeans-clustering.
Which distance measure do you want to use? -Xiangrui

On Tue, May 12, 2015 at 7:23 PM, June  wrote:
> Dear list,
>
>
>
> I am new to spark, and I want to use the kmeans algorithm in mllib package.
>
> I am wondering whether it is possible to customize the distance measure used
> by kmeans, and how?
>
>
>
> Many thanks!
>
>
>
> June

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Stratified sampling with DataFrames

2015-05-19 Thread Xiangrui Meng
You need to convert DataFrame to RDD, call sampleByKey, and then apply
the schema back to create DataFrame.

val df: DataFrame = ...
val schema = df.schema
val sampledRDD = df.rdd.keyBy(r => r.getAs[Int](0)).sampleByKey(...).values
val sampled = sqlContext.createDataFrame(sampledRDD, schema)

Hopefully this would be much easier in 1.5.

Best,
Xiangrui

On Mon, May 11, 2015 at 12:32 PM, Karthikeyan Muthukumar
 wrote:
> Hi,
> I'm in Spark 1.3.0 and my data is in DataFrames.
> I need operations like sampleByKey(), sampleByKeyExact().
> I saw the JIRA "Add approximate stratified sampling to DataFrame"
> (https://issues.apache.org/jira/browse/SPARK-7157).
> That's targeted for Spark 1.5, till that comes through, whats the easiest
> way to accomplish the equivalent of sampleByKey() and sampleByKeyExact() on
> DataFrames.
> Thanks & Regards
> MK
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark mllib kmeans

2015-05-19 Thread Xiangrui Meng
Just curious, what distance measure do you need? -Xiangrui

On Mon, May 11, 2015 at 8:28 AM, Jaonary Rabarisoa  wrote:
> take a look at this
> https://github.com/derrickburns/generalized-kmeans-clustering
>
> Best,
>
> Jao
>
> On Mon, May 11, 2015 at 3:55 PM, Driesprong, Fokko 
> wrote:
>>
>> Hi Paul,
>>
>> I would say that it should be possible, but you'll need a different
>> distance measure which conforms to your coordinate system.
>>
>> 2015-05-11 14:59 GMT+02:00 Pa Rö :
>>>
>>> hi,
>>>
>>> it is possible to use a custom distance measure and a other data typ as
>>> vector?
>>> i want cluster temporal geo datas.
>>>
>>> best regards
>>> paul
>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: SQL UserDefinedType can't be saved in parquet file when using assembly jar

2015-05-19 Thread Xiangrui Meng
Hey Jaonary,

I saw this line in the error message:

org.apache.spark.sql.types.DataType$CaseClassStringParser$.apply(dataTypes.scala:163)

CaseClassStringParser is only used in older versions of Spark to parse
schema from JSON. So I suspect that the cluster was running on a old
version of Spark when you use spark-submit to run your assembly jar.

Best,
Xiangrui

On Mon, May 11, 2015 at 7:40 AM, Jaonary Rabarisoa  wrote:
> In this example, every thing work expect save to parquet file.
>
> On Mon, May 11, 2015 at 4:39 PM, Jaonary Rabarisoa 
> wrote:
>>
>> MyDenseVectorUDT do exist in the assembly jar and in this example all the
>> code is in a single file to make sure every thing is included.
>>
>> On Tue, Apr 21, 2015 at 1:17 AM, Xiangrui Meng  wrote:
>>>
>>> You should check where MyDenseVectorUDT is defined and whether it was
>>> on the classpath (or in the assembly jar) at runtime. Make sure the
>>> full class name (with package name) is used. Btw, UDTs are not public
>>> yet, so please use it with caution. -Xiangrui
>>>
>>> On Fri, Apr 17, 2015 at 12:45 AM, Jaonary Rabarisoa 
>>> wrote:
>>> > Dear all,
>>> >
>>> > Here is an example of code to reproduce the issue I mentioned in a
>>> > previous
>>> > mail about saving an UserDefinedType into a parquet file. The problem
>>> > here
>>> > is that the code works when I run it inside intellij idea but fails
>>> > when I
>>> > create the assembly jar and run it with spark-submit. I use the master
>>> > version of  Spark.
>>> >
>>> > @SQLUserDefinedType(udt = classOf[MyDenseVectorUDT])
>>> > class MyDenseVector(val data: Array[Double]) extends Serializable {
>>> >   override def equals(other: Any): Boolean = other match {
>>> > case v: MyDenseVector =>
>>> >   java.util.Arrays.equals(this.data, v.data)
>>> > case _ => false
>>> >   }
>>> > }
>>> >
>>> > class MyDenseVectorUDT extends UserDefinedType[MyDenseVector] {
>>> >   override def sqlType: DataType = ArrayType(DoubleType, containsNull =
>>> > false)
>>> >   override def serialize(obj: Any): Seq[Double] = {
>>> > obj match {
>>> >   case features: MyDenseVector =>
>>> > features.data.toSeq
>>> > }
>>> >   }
>>> >
>>> >   override def deserialize(datum: Any): MyDenseVector = {
>>> > datum match {
>>> >   case data: Seq[_] =>
>>> > new MyDenseVector(data.asInstanceOf[Seq[Double]].toArray)
>>> > }
>>> >   }
>>> >
>>> >   override def userClass: Class[MyDenseVector] = classOf[MyDenseVector]
>>> >
>>> > }
>>> >
>>> > case class Toto(imageAnnotation: MyDenseVector)
>>> >
>>> > object TestUserDefinedType {
>>> >
>>> >   case class Params(input: String = null,
>>> >partitions: Int = 12,
>>> > outputDir: String = "images.parquet")
>>> >
>>> >   def main(args: Array[String]): Unit = {
>>> >
>>> > val conf = new
>>> > SparkConf().setAppName("ImportImageFolder").setMaster("local[4]")
>>> >
>>> > val sc = new SparkContext(conf)
>>> > val sqlContext = new SQLContext(sc)
>>> >
>>> > import sqlContext.implicits._
>>> >
>>> > val rawImages = sc.parallelize((1 to 5).map(x => Toto(new
>>> > MyDenseVector(Array[Double](x.toDouble).toDF
>>> >
>>> > rawImages.printSchema()
>>> >
>>> > rawImages.show()
>>> >
>>> > rawImages.save("toto.parquet") // This fails with assembly jar
>>> > sc.stop()
>>> >
>>> >   }
>>> > }
>>> >
>>> >
>>> > My build.sbt is as follow :
>>> >
>>> > libraryDependencies ++= Seq(
>>> >   "org.apache.spark" %% "spark-core" % sparkVersion % "provided",
>>> >   "org.apache.spark" %% "spark-sql" % sparkVersion,
>>> >   "org.apache.spark" %% "spark-mllib" % sparkVersion
>>> > )
>>> >
>>> > assemblyMergeStrategy in assembly := {
>>> >   case PathList("javax", "servlet", xs @ _*) => MergeStrategy.first
>>> >   case PathList("org", "apache", xs @ _*) => MergeStrategy.first
>>> >   case PathList("org", "jboss", xs @ _*) => MergeStrategy.first
>>> > //  case PathList(ps @ _*) if ps.last endsWith ".html" =>
>>> > MergeStrategy.first
>>> > //  case "application.conf"=>
>>> > MergeStrategy.concat
>>> >   case m if m.startsWith("META-INF") => MergeStrategy.discard
>>> >   //case x =>
>>> >   //  val oldStrategy = (assemblyMergeStrategy in assembly).value
>>> >   //  oldStrategy(x)
>>> >   case _ => MergeStrategy.first
>>> > }
>>> >
>>> >
>>> > As I said, this code works without problem when I execute it inside
>>> > intellij
>>> > idea. But when generate the assembly jar with sbt-assembly and
>>> >
>>> > use spark-submit I got the following error :
>>> >
>>> > 15/04/17 09:34:01 INFO ParquetOutputFormat: Writer version is:
>>> > PARQUET_1_0
>>> > 15/04/17 09:34:01 ERROR Executor: Exception in task 3.0 in stage 2.0
>>> > (TID 7)
>>> > java.lang.IllegalArgumentException: Unsupported dataType:
>>> >
>>> > {"type":"struct","fields":[{"name":"imageAnnotation","type":{"type":"udt","class":"MyDenseVectorUDT","pyClass":null,"sqlType":{"type":"array","elementType":"double"

Re: Find KNN in Spark SQL

2015-05-19 Thread Xiangrui Meng
Spark SQL doesn't provide spatial features. Large-scale KNN is usually
combined with locality-sensitive hashing (LSH). This Spark package may
be helpful: http://spark-packages.org/package/mrsqueeze/spark-hash.
-Xiangrui

On Sat, May 9, 2015 at 9:25 PM, Dong Li  wrote:
> Hello experts,
>
> I’m new to Spark, and want to find K nearest neighbors on huge scale 
> high-dimension points dataset in very short time.
>
> The scenario is: the dataset contains more than 10 million points, whose 
> dimension is 200d. I’m building a web service, to receive one new point at 
> each request and return K nearest points inside that dataset, also need to 
> ensure the time-cost not very high. I have a cluster with several high-memory 
> nodes for this service.
>
> Currently I only have these ideas here:
> 1. To create several ball-tree instances in each node when service 
> initializing. This is fast, but not perform well at data scaling ability. I 
> cannot insert new nodes to the ball-trees unless I restart the services and 
> rebuild them.
> 2. To use sql based solution. Some database like PostgreSQL and SqlServer 
> have features on spatial search. But these database may not perform well in 
> big data environment. (Does SparkSQL have Spatial features or spatial index?)
>
> Based on your experience, can I achieve this scenario in Spark SQL? Or do you 
> know other projects in Spark stack acting well for this?
> Any ideas are appreciated, thanks very much.
>
> Regards,
> Dong
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to implement an Evaluator for a ML pipeline?

2015-05-19 Thread Xiangrui Meng
The documentation needs to be updated to state that higher metric
values are better (https://issues.apache.org/jira/browse/SPARK-7740).
I don't know why if you negate the return value of the Evaluator you
still get the highest regularization parameter candidate. Maybe you
should check the log messages from CrossValidator and see the average
metric values during cross validation. -Xiangrui

On Sat, May 9, 2015 at 12:15 PM, Stefan H.  wrote:
> Hello everyone,
>
> I am stuck with the (experimental, I think) API for machine learning
> pipelines. I have a pipeline with just one estimator (ALS) and I want it to
> try different values for the regularization parameter. Therefore I need to
> supply an Evaluator that returns a value of type Double. I guess this could
> be something like accuracy or mean squared error? The only implementation I
> found is BinaryClassificationEvaluator, and I did not understand the
> computation there.
>
> I could not find detailed documentation so I implemented a dummy Evaluator
> that just returns the regularization parameter:
>
>   new Evaluator {
> def evaluate(dataset: DataFrame, paramMap: ParamMap): Double =
>   paramMap.get(als.regParam).getOrElse(throw new Exception)
>   }
>
> I just wanted to see whether the lower or higher value "wins". On the
> resulting model I inspected the chosen regularization parameter this way:
>
>   cvModel.bestModel.fittingParamMap.get(als.regParam)
>
> And it was the highest of my three regularization parameter candidates.
> Strange thing is, if I negate the return value of the Evaluator, that line
> still returns the highest regularization parameter candidate.
>
> So I am probably working with false assumptions. I'd be grateful if someone
> could point me to some documentation or examples, or has a few hints to
> share.
>
> Cheers,
> Stefan
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/How-to-implement-an-Evaluator-for-a-ML-pipeline-tp22830.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Add to Powered by Spark page

2015-05-19 Thread Michal Klos
Hi,

We would like to be added to the Powered by Spark list:

organization name: Localytics
URL: http://eng.localytics.com/
a list of which Spark components you are using: Spark, Spark Streaming,
MLLib
a short description of your use case: Batch, real-time, and predictive
analytics driving our mobile app analytics and marketing automation product.

thanks,

M


Re: RandomSplit with Spark-ML and Dataframe

2015-05-19 Thread Olivier Girardot
Thank you !

Le mar. 19 mai 2015 à 21:08, Xiangrui Meng  a écrit :

> In 1.4, we added RAND as a DataFrame expression, which can be used for
> random split. Please check the example here:
>
> https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.
> 
> -Xiangrui
> 
>
> On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot
>  wrote:
> > Hi,
> > is there any best practice to do like in MLLib a randomSplit of
> > training/cross-validation set with dataframes and the pipeline API ?
> >
> > Regards
> >
> > Olivier.
>


Re: Discretization

2015-05-19 Thread Xiangrui Meng
Thanks for asking! We should improve the documentation. The sample
dataset is actually mimicking the MNIST digits dataset, where the
values are gray levels (0-255). So by dividing by 16, we want to map
it to 16 coarse bins for the gray levels. Actually, there is a bug in
the doc, we should convert the values to integer first before dividing
by 16. I created https://issues.apache.org/jira/browse/SPARK-7739 for
this issue. Welcome to submit a patch:) Thanks!

Best,
Xiangrui

On Thu, May 7, 2015 at 9:20 PM, spark_user_2015  wrote:
> The Spark documentation shows the following example code:
>
> // Discretize data in 16 equal bins since ChiSqSelector requires categorical
> features
> val discretizedData = data.map { lp =>
>   LabeledPoint(lp.label, Vectors.dense(lp.features.toArray.map { x => x / 16
> } ) )
> }
>
> I'm sort of missing why "x / 16" is considered a discretization approach
> here.
>
> [https://spark.apache.org/docs/latest/mllib-feature-extraction.html#feature-selection]
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/Discretization-tp22811.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Bill Jay
If a Spark streaming job stops at 12:01 and I resume the job at 12:02. Will
it still start to consume the data that were produced to Kafka at 12:01? Or
it will just start consuming from the current time?


On Tue, May 19, 2015 at 10:58 AM, Cody Koeninger  wrote:

> Have you read
> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ?
>
> 1.  There's nothing preventing that.
>
> 2. Checkpointing will give you at-least-once semantics, provided you have
> sufficient kafka retention.  Be aware that checkpoints aren't recoverable
> if you upgrade code.
>
> On Tue, May 19, 2015 at 12:42 PM, Bill Jay 
> wrote:
>
>> Hi all,
>>
>> I am currently using Spark streaming to consume and save logs every hour
>> in our production pipeline. The current setting is to run a crontab job to
>> check every minute whether the job is still there and if not resubmit a
>> Spark streaming job. I am currently using the direct approach for Kafka
>> consumer. I have two questions:
>>
>> 1. In the direct approach, no offset is stored in zookeeper and no group
>> id is specified. Can two consumers (one is Spark streaming and the other is
>> a Kafak console consumer in Kafka package) read from the same topic from
>> the brokers together (I would like both of them to get all messages, i.e.
>> publish-subscribe mode)? What about two Spark streaming jobs read from the
>> same topic?
>>
>> 2. How to avoid data loss if a Spark job is killed? Does checkpointing
>> serve this purpose? The default behavior of Spark streaming is to read the
>> latest logs. However, if a job is killed, can the new job resume from what
>> was left to avoid loosing logs?
>>
>> Thanks!
>>
>> Bill
>>
>
>


Re: Implicit matrix factorization returning different results between spark 1.2.0 and 1.3.0

2015-05-19 Thread Xiangrui Meng
In implicit feedback model, the coefficients were already penalized
(towards zero) by the number of unobserved ratings. So I think it is
fair to keep the 1.3.0 weighting (by the number of total users/items).
Again, I don't think we have a clear answer. It would be nice to run
some experiments and see which works better. -Xiangrui

On Thu, May 7, 2015 at 9:35 AM, Ravi Mody  wrote:
> After thinking about it more, I do think weighting lambda by sum_i cij is
> the equivalent of the ALS-WR paper's approach for the implicit case. This
> provides scale-invariance for varying products/users and for varying
> ratings, and should behave well for all alphas. What do you guys think?
>
> On Wed, May 6, 2015 at 12:29 PM, Ravi Mody  wrote:
>>
>> Whoops I just saw this thread, it got caught in my spam filter. Thanks for
>> looking into this Xiangrui and Sean.
>>
>> The implicit situation does seem fairly complicated to me. The cost
>> function (not including the regularization term) is affected both by the
>> number of ratings and by the number of user/products. As we increase alpha
>> the contribution to the cost function from the number of users/products
>> diminishes compared to the contribution from the number of ratings. So large
>> alphas seem to favor the weighted-lambda approach, even though it's not a
>> perfect match. Smaller alphas favor Xiangrui's 1.3.0 approach, but again
>> it's not a perfect match.
>>
>> I believe low alphas won't work well with regularization because both
>> terms in the cost function will just push everything to zero. Some of my
>> experiments confirm this. This leads me to think that weighted-lambda would
>> work better in practice, but I have no evidence of this. It may make sense
>> to weight lambda by sum_i cij instead?
>>
>>
>>
>>
>>
>> On Wed, Apr 1, 2015 at 7:59 PM, Xiangrui Meng  wrote:
>>>
>>> Ravi, we just merged https://issues.apache.org/jira/browse/SPARK-6642
>>> and used the same lambda scaling as in 1.2. The change will be
>>> included in Spark 1.3.1, which will be released soon. Thanks for
>>> reporting this issue! -Xiangrui
>>>
>>> On Tue, Mar 31, 2015 at 8:53 PM, Xiangrui Meng  wrote:
>>> > I created a JIRA for this:
>>> > https://issues.apache.org/jira/browse/SPARK-6637. Since we don't have
>>> > a clear answer about how the scaling should be handled. Maybe the best
>>> > solution for now is to switch back to the 1.2 scaling. -Xiangrui
>>> >
>>> > On Tue, Mar 31, 2015 at 2:50 PM, Sean Owen  wrote:
>>> >> Ah yeah I take your point. The squared error term is over the whole
>>> >> user-item matrix, technically, in the implicit case. I suppose I am
>>> >> used to assuming that the 0 terms in this matrix are weighted so much
>>> >> less (because alpha is usually large-ish) that they're almost not
>>> >> there, but they are. So I had just used the explicit formulation.
>>> >>
>>> >> I suppose the result is kind of scale invariant, but not exactly. I
>>> >> had not prioritized this property since I had generally built models
>>> >> on the full data set and not a sample, and had assumed that lambda
>>> >> would need to be retuned over time as the input grew anyway.
>>> >>
>>> >> So, basically I don't know anything more than you do, sorry!
>>> >>
>>> >> On Tue, Mar 31, 2015 at 10:41 PM, Xiangrui Meng 
>>> >> wrote:
>>> >>> Hey Sean,
>>> >>>
>>> >>> That is true for explicit model, but not for implicit. The ALS-WR
>>> >>> paper doesn't cover the implicit model. In implicit formulation, a
>>> >>> sub-problem (for v_j) is:
>>> >>>
>>> >>> min_{v_j} \sum_i c_ij (p_ij - u_i^T v_j)^2 + lambda * X * \|v_j\|_2^2
>>> >>>
>>> >>> This is a sum for all i but not just the users who rate item j. In
>>> >>> this case, if we set X=m_j, the number of observed ratings for item
>>> >>> j,
>>> >>> it is not really scale invariant. We have #users user vectors in the
>>> >>> least squares problem but only penalize lambda * #ratings. I was
>>> >>> suggesting using lambda * m directly for implicit model to match the
>>> >>> number of vectors in the least squares problem. Well, this is my
>>> >>> theory. I don't find any public work about it.
>>> >>>
>>> >>> Best,
>>> >>> Xiangrui
>>> >>>
>>> >>> On Tue, Mar 31, 2015 at 5:17 AM, Sean Owen 
>>> >>> wrote:
>>>  I had always understood the formulation to be the first option you
>>>  describe. Lambda is scaled by the number of items the user has rated
>>>  /
>>>  interacted with. I think the goal is to avoid fitting the tastes of
>>>  prolific users disproportionately just because they have many
>>>  ratings
>>>  to fit. This is what's described in the ALS-WR paper we link to on
>>>  the
>>>  Spark web site, in equation 5
>>> 
>>>  (http://www.grappa.univ-lille3.fr/~mary/cours/stats/centrale/reco/paper/MatrixFactorizationALS.pdf)
>>> 
>>>  I think this also gets you the scale-invariance? For every
>>>  additional
>>>  rating from user i to product j, you add one new term to the
>>>  squared-err

Re: Code error

2015-05-19 Thread Stephen Boesch
Hi Ricardo,
 providing the error output would help . But in any case you need to do a
collect() on the rdd returned from computeCost.

2015-05-19 11:59 GMT-07:00 Ricardo Goncalves da Silva <
ricardog.si...@telefonica.com>:

>  Hi,
>
>
>
> Can anybody see what’s wrong in this piece of code:
>
>
>
>
>
> ./bin/spark-shell --num-executors 2 --executor-memory 512m --master
> yarn-client
>
> import org.apache.spark.mllib.clustering.KMeans
>
> import org.apache.spark.mllib.linalg.Vectors
>
>
>
>
>
> val data = sc.textFile("/user/p_loadbd/fraude5.csv").map(x =>
> x.toLowerCase.split(',')).map(x => x(0)+","+x(1))
>
> val header = data.first()
>
> val filter_data = data.filter(x => x != header)
>
> val parsedData = data.map(s =>
> Vectors.dense(s.split(',').map(_.toDouble))).cache()
>
>
>
> val numClusters = 2
>
> val numIterations = 20
>
> val clusters = KMeans.train(parsedData, numClusters, numIterations)
>
>
>
> val WSSSE = clusters.computeCost(parsedData)
>
> println("Within Set Sum of Squared Errors = " + WSSSE)
>
>
>
> Thanks.
>
>
>
>
>
> [image: Descrição: Descrição: Descrição:
> cid:image002.jpg@01CC89A8.2B628650]
>
> *Ricardo Goncalves da Silva*
> Lead Data Scientist *|* Seção de Desenvolvimento de Sistemas de
>
> Business Intelligence – Projetos de Inovação *| *IDPB02
>
> Av. Eng. Luis Carlos Berrini, 1.376 – 7º – 04571-000 - SP
>
> ricardog.si...@telefonica.com *|* www.telefonica.com.br
>
> Tel +55 11 3430 4955 *| *Cel +55 11 94292 9526
>
>
>
>
>
> --
>
> Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario,
> puede contener información privilegiada o confidencial y es para uso
> exclusivo de la persona o entidad de destino. Si no es usted. el
> destinatario indicado, queda notificado de que la lectura, utilización,
> divulgación y/o copia sin autorización puede estar prohibida en virtud de
> la legislación vigente. Si ha recibido este mensaje por error, le rogamos
> que nos lo comunique inmediatamente por esta misma vía y proceda a su
> destrucción.
>
> The information contained in this transmission is privileged and
> confidential information intended only for the use of the individual or
> entity named above. If the reader of this message is not the intended
> recipient, you are hereby notified that any dissemination, distribution or
> copying of this communication is strictly prohibited. If you have received
> this transmission in error, do not read it. Please immediately reply to the
> sender that you have received this communication in error and then delete
> it.
>
> Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário,
> pode conter informação privilegiada ou confidencial e é para uso exclusivo
> da pessoa ou entidade de destino. Se não é vossa senhoria o destinatário
> indicado, fica notificado de que a leitura, utilização, divulgação e/ou
> cópia sem autorização pode estar proibida em virtude da legislação vigente.
> Se recebeu esta mensagem por erro, rogamos-lhe que nos o comunique
> imediatamente por esta mesma via e proceda a sua destruição
>


Re: User Defined Type (UDT)

2015-05-19 Thread Xiangrui Meng
(Note that UDT is not a public API yet.)

On Thu, May 7, 2015 at 7:11 AM, wjur  wrote:
> Hi all!
>
> I'm using Spark 1.3.0 and I'm struggling with a definition of a new type for
> a project I'm working on. I've created a case class Person(name: String) and
> now I'm trying to make Spark to be able serialize and deserialize the
> defined type. I made a couple of attempts but none of them did not work in
> 100% (there were issues either in serialization or deserialization).
>
> This is my class and the corresponding UDT.
>
> @SQLUserDefinedType(udt = classOf[PersonUDT])
> case class Person(name: String)
>
> class PersonUDT extends UserDefinedType[Person] {
>   override def sqlType: DataType = StructType(Seq(StructField("name",
> StringType)))
>
>   override def serialize(obj: Any): Seq[Any] = {

This should return a Row instance instead of Seq[Any], because the
sqlType is a struct type.

> obj match {
>   case c: Person =>
> Seq(c.name)
> }
>   }
>
>   override def userClass: Class[Person] = classOf[Person]
>
>   override def deserialize(datum: Any): Person = {
> datum match {
>   case values: Seq[_] =>
> assert(values.length == 1)
> Person(values.head.asInstanceOf[String])
>   case values: util.ArrayList[_] =>
> Person(values.get(0).asInstanceOf[String])
> }
>   }
>
>   // In some other attempt I was creating RDD of Seq with manually
> serialized data and
>   // I had to override equals because two DFs with the same type weren't
> actually equal
>   // StructField(person,...types.PersonUDT@a096ac3)
>   // StructField(person,...types.PersonUDT@613fd937)
>   def canEqual(other: Any): Boolean = other.isInstanceOf[PersonUDT]
>
>   override def equals(other: Any): Boolean = other match {
> case that: PersonUDT => true
> case _ => false
>   }
>
>   override def hashCode(): Int = 1
> }
>
> This is how I create RDD of Person and then try to create a DataFrame
> val rdd = sparkContext.parallelize((1 to 100).map(i => Person(i.toString)))
> val sparkDataFrame = sqlContext.createDataFrame(rdd)
>
> The second line throws an exception:
> java.lang.ClassCastException: types.PersonUDT cannot be cast to
> org.apache.spark.sql.types.StructType
> at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:316)
>
> I looked into the code in SQLContext.scala and it seems that the code
> requires UDT to be extending StructType but in fact it extends
> UserDefinedType which extends directly DataType.
> I'm not sure whether it is a bug or I just don't know how to use UDTs.
>
> Do you have any suggestions how to solve this? I based my UDT on
> ExamplePointUDT but it seems to be incorrect. Is there a working example for
> UDT?
>
>
> Thank you for the reply in advance!
> wjur
>
>
>
> --
> View this message in context: 
> http://apache-spark-user-list.1001560.n3.nabble.com/User-Defined-Type-UDT-tp22796.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: RandomSplit with Spark-ML and Dataframe

2015-05-19 Thread Xiangrui Meng
In 1.4, we added RAND as a DataFrame expression, which can be used for
random split. Please check the example here:
https://github.com/apache/spark/blob/master/python/pyspark/ml/tuning.py#L214.
-Xiangrui

On Thu, May 7, 2015 at 8:39 AM, Olivier Girardot
 wrote:
> Hi,
> is there any best practice to do like in MLLib a randomSplit of
> training/cross-validation set with dataframes and the pipeline API ?
>
> Regards
>
> Olivier.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: MLlib libsvm isssues with data

2015-05-19 Thread Xiangrui Meng
The index should start from 1 for LIBSVM format, as defined in the README
of LIBSVM (https://github.com/cjlin1/libsvm/blob/master/README#L64). The
only exception is the precomputed kernel, which MLlib doesn't support.
-Xiangrui

On Wed, May 6, 2015 at 1:42 AM, doyere  wrote:

> Hi all,
>
> After do some tests,finally I solve it.I wrote here for other people who
> met this question. here's a example of data format error I faced
>
> 0 0:0 1:0 2:1
> 1 1:1 3:2
>
> the data for 0:0 and 1:0/1:1 is the reason for
> ArrayIndexOutOfBoundsException.If someone who faced the same question just
> delete them from u past data or update it. Since it was worked in
> libsvm-tools,so I guess in spark MLlib it just implements a bit different.
>
>  原始邮件
> *发件人:* doyere
> *收件人:* user
> *发送时间:* 2015年5月6日(周三) 08:59
> *主题:* MLlib libsvm isssues with data
>
> hi all:
>
> I’ve met a issues with MLlib.I used posted to the community seems put the
> wrong place:( .Then I put in stackoverflowf.for a good format details plz
> see
> http://stackoverflow.com/questions/30048344/spark-mllib-libsvm-isssues-with-data.hope
> someone could help [image: 😢]
>
> I guess it’s due to my data.but I’ve test it in libsvm-tools it worked
> well,and I’ve used the libsvm data python data format test tool and it’s
> ok.Just don’t know why it errors with java.lang.
> ArrayIndexOutOfBoundsException: -1 :(
>
> And this is my first time using the mail list ask for help.If I did
> something wrong or I described not clearly plz tell me.
>
>
> doye
>


Code error

2015-05-19 Thread Ricardo Goncalves da Silva
Hi,

Can anybody see what's wrong in this piece of code:


./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors


val data = sc.textFile("/user/p_loadbd/fraude5.csv").map(x => 
x.toLowerCase.split(',')).map(x => x(0)+","+x(1))
val header = data.first()
val filter_data = data.filter(x => x != header)
val parsedData = data.map(s => 
Vectors.dense(s.split(',').map(_.toDouble))).cache()

val numClusters = 2
val numIterations = 20
val clusters = KMeans.train(parsedData, numClusters, numIterations)

val WSSSE = clusters.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE)

Thanks.


[Descrição: Descrição: Descrição: cid:image002.jpg@01CC89A8.2B628650]

Ricardo Goncalves da Silva
Lead Data Scientist | Seção de Desenvolvimento de Sistemas de
Business Intelligence - Projetos de Inovação | IDPB02
Av. Eng. Luis Carlos Berrini, 1.376 - 7º - 04571-000 - SP
ricardog.si...@telefonica.com | 
www.telefonica.com.br
Tel +55 11 3430 4955 | Cel +55 11 94292 9526






Este mensaje y sus adjuntos se dirigen exclusivamente a su destinatario, puede 
contener información privilegiada o confidencial y es para uso exclusivo de la 
persona o entidad de destino. Si no es usted. el destinatario indicado, queda 
notificado de que la lectura, utilización, divulgación y/o copia sin 
autorización puede estar prohibida en virtud de la legislación vigente. Si ha 
recibido este mensaje por error, le rogamos que nos lo comunique inmediatamente 
por esta misma vía y proceda a su destrucción.

The information contained in this transmission is privileged and confidential 
information intended only for the use of the individual or entity named above. 
If the reader of this message is not the intended recipient, you are hereby 
notified that any dissemination, distribution or copying of this communication 
is strictly prohibited. If you have received this transmission in error, do not 
read it. Please immediately reply to the sender that you have received this 
communication in error and then delete it.

Esta mensagem e seus anexos se dirigem exclusivamente ao seu destinatário, pode 
conter informação privilegiada ou confidencial e é para uso exclusivo da pessoa 
ou entidade de destino. Se não é vossa senhoria o destinatário indicado, fica 
notificado de que a leitura, utilização, divulgação e/ou cópia sem autorização 
pode estar proibida em virtude da legislação vigente. Se recebeu esta mensagem 
por erro, rogamos-lhe que nos o comunique imediatamente por esta mesma via e 
proceda a sua destruição


Problem querying RDD using HiveThriftServer2.startWithContext functionality

2015-05-19 Thread fdmitriy
Hi,

I am trying to query a Spark RDD using the
HiveThriftServer2.startWithContext functionality and getting the following
Exception:

15/05/19 13:26:43 WARN thrift.ThriftCLIService: Error executing statement: 
java.lang.RuntimeException: java.lang.NullPointerException
at
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:84)
at
org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:37)
at
org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:64)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1548)
at
org.apache.hadoop.hive.shims.HadoopShimsSecure.doAs(HadoopShimsSecure.java:502)
at
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:60)
at com.sun.proxy.$Proxy27.executeStatementAsync(Unknown Source)
at
org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:237)
at
org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:392)
at
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1373)
at
org.apache.hive.service.cli.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1358)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at
org.apache.hive.service.auth.TSetIpAddressProcessor.process(TSetIpAddressProcessor.java:55)
at
org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:244)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
at org.apache.hadoop.hive.conf.HiveConf.getIntVar(HiveConf.java:1259)
at
org.apache.hive.service.cli.log.LogManager.createNewOperationLog(LogManager.java:101)
at
org.apache.hive.service.cli.log.LogManager.getOperationLogByOperation(LogManager.java:156)
at
org.apache.hive.service.cli.log.LogManager.registerCurrentThread(LogManager.java:120)
at
org.apache.hive.service.cli.session.HiveSessionImpl.runOperationWithLogCapture(HiveSessionImpl.java:714)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:370)
at
org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:357)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at
org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:79)
... 19 more

Code:

import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.SparkContext._
import  org.apache.spark.sql.hive._

object FederatedQueryTest
{
  
  def main(args: Array[String]) 
  {
val sparkConf = new SparkConf().setAppName("FederatedQueryTest")
val sc =  new SparkContext(sparkConf)
...
val hoursAug = sqlContext.sql("SELECT H.Hour, H.DataStore, H.SchemaName,
H.TableName, H.ColumnName, H.EventAffectedCount, H.EventCount, " +
  "U.USERNAME, U.USERGROUP, U.LOCATION,
U.DEPARTMENT " +
  "FROM HOURS H 
 
" +
  "JOIN USERS U 
 
" + 
  "ON H.User = U.USERNAME")

hoursAug.registerTempTable("HOURS_AUGM")
hoursAug.show(100)

import  org.apache.spark.sql.hive.thriftserver._
HiveThriftServer2.startWithContext(sqlContext)
  }
}

Environment:

CDH 5.3
Spark 1.3.0 (upgraded from the default Spark 1.2.0 on CDH 5.3)
Hive Metastore is in MySQL

Configuration steps:

1. Rebuilt Spark with Hive support using the command:
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive
-Phive-thriftserver -DskipTests clean package
2. Replaced Spark Assembly jar with the result of the build.
3. Placed hive-site.xml into Spark conf directory.
4. Using Beeline to work with Spark Thrift Server. 
The "connect" command passes successfully, but any "select" or "show
tables" command results in the Null Pointer Exception with the stack trace
as shown above. However, when starting Spark Thrift Server from command line
using /usr/lib/spark/sbin/start-thriftserver.sh, I am able to see and 

Re: Spark Streaming + Kafka failure recovery

2015-05-19 Thread Cody Koeninger
Have you read
https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md ?

1.  There's nothing preventing that.

2. Checkpointing will give you at-least-once semantics, provided you have
sufficient kafka retention.  Be aware that checkpoints aren't recoverable
if you upgrade code.

On Tue, May 19, 2015 at 12:42 PM, Bill Jay 
wrote:

> Hi all,
>
> I am currently using Spark streaming to consume and save logs every hour
> in our production pipeline. The current setting is to run a crontab job to
> check every minute whether the job is still there and if not resubmit a
> Spark streaming job. I am currently using the direct approach for Kafka
> consumer. I have two questions:
>
> 1. In the direct approach, no offset is stored in zookeeper and no group
> id is specified. Can two consumers (one is Spark streaming and the other is
> a Kafak console consumer in Kafka package) read from the same topic from
> the brokers together (I would like both of them to get all messages, i.e.
> publish-subscribe mode)? What about two Spark streaming jobs read from the
> same topic?
>
> 2. How to avoid data loss if a Spark job is killed? Does checkpointing
> serve this purpose? The default behavior of Spark streaming is to read the
> latest logs. However, if a job is killed, can the new job resume from what
> was left to avoid loosing logs?
>
> Thanks!
>
> Bill
>


Spark Streaming + Kafka failure recovery

2015-05-19 Thread Bill Jay
Hi all,

I am currently using Spark streaming to consume and save logs every hour in
our production pipeline. The current setting is to run a crontab job to
check every minute whether the job is still there and if not resubmit a
Spark streaming job. I am currently using the direct approach for Kafka
consumer. I have two questions:

1. In the direct approach, no offset is stored in zookeeper and no group id
is specified. Can two consumers (one is Spark streaming and the other is a
Kafak console consumer in Kafka package) read from the same topic from the
brokers together (I would like both of them to get all messages, i.e.
publish-subscribe mode)? What about two Spark streaming jobs read from the
same topic?

2. How to avoid data loss if a Spark job is killed? Does checkpointing
serve this purpose? The default behavior of Spark streaming is to read the
latest logs. However, if a job is killed, can the new job resume from what
was left to avoid loosing logs?

Thanks!

Bill


Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Matei Zaharia
Yeah, this definitely seems useful there. There might also be some ways to cap 
the application in Mesos, but I'm not sure.

Matei

> On May 19, 2015, at 1:11 PM, Thomas Dudziak  wrote:
> 
> I'm using fine-grained for a multi-tenant environment which is why I would 
> welcome the limit of tasks per job :)
> 
> cheers,
> Tom
> 
> On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia  > wrote:
> Hey Tom,
> 
> Are you using the fine-grained or coarse-grained scheduler? For the 
> coarse-grained scheduler, there is a spark.cores.max config setting that will 
> limit the total # of cores it grabs. This was there in earlier versions too.
> 
> Matei
> 
> > On May 19, 2015, at 12:39 PM, Thomas Dudziak  > > wrote:
> >
> > I read the other day that there will be a fair number of improvements in 
> > 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a 
> > configurable limit for the number of tasks for jobs run on Mesos ? This 
> > would be a very simple yet effective way to prevent a job dominating the 
> > cluster.
> >
> > cheers,
> > Tom
> >
> 
> 



Re: Getting the best parameter set back from CrossValidatorModel

2015-05-19 Thread Joseph Bradley
Hi Justin & Ram,

To clarify, PipelineModel.stages is not private[ml]; only the PipelineModel
constructor is private[ml].  So it's safe to use pipelineModel.stages as a
Spark user.

Ram's example looks good.  Btw, in Spark 1.4 (and the current master
build), we've made a number of improvements to Params and Pipelines, so
this should become easier to use!

Joseph

On Sun, May 17, 2015 at 10:17 PM, Justin Yip 
wrote:

>
> Thanks Ram.
>
> Your sample look is very helpful. (there is a minor bug that
> PipelineModel.stages is hidden under private[ml], just need a wrapper
> around it. :)
>
> Justin
>
> On Sat, May 16, 2015 at 10:44 AM, Ram Sriharsha 
> wrote:
>
>> Hi Justin
>>
>> The CrossValidatorExample here
>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala
>> is a good example of how to set up an ML Pipeline for extracting a model
>> with the best parameter set.
>>
>> You set up the pipeline as in here:
>>
>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L73
>>
>> This pipeline is treated as an estimator and wrapped into a Cross
>> Validator to do grid search and return the model with the best parameters .
>> Once you have trained the best model as in here
>>
>> https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala#L93
>>
>> The result is a CrossValidatorModel which contains the best estimator
>> (i.e. the best pipeline above) and you can extract the best pipeline and
>> inquire its parameters as follows:
>>
>> // what are the best parameters?
>> val bestPipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
>> val stages = bestPipelineModel.stages
>>
>> val hashingStage = stages(1).asInstanceOf[HashingTF]
>> println(hashingStage.getNumFeatures)
>> val lrStage = stages(2).asInstanceOf[LogisticRegressionModel]
>> println(lrStage.getRegParam)
>>
>>
>>
>> Ram
>>
>> On Sat, May 16, 2015 at 3:17 AM, Justin Yip 
>> wrote:
>>
>>> Hello,
>>>
>>> I am using MLPipeline. I would like to extract the best parameter found
>>> by CrossValidator. But I cannot find much document about how to do it. Can
>>> anyone give me some pointers?
>>>
>>> Thanks.
>>>
>>> Justin
>>>
>>> --
>>> View this message in context: Getting the best parameter set back from
>>> CrossValidatorModel
>>> 
>>> Sent from the Apache Spark User List mailing list archive
>>>  at Nabble.com.
>>>
>>
>>
>


Re: spark streaming doubt

2015-05-19 Thread Shushant Arora
Thanks Akhil andDibyendu.

Does in high level receiver based streaming executors run on receivers
itself to have data localisation ? Or its always data is transferred to
executor nodes and executor nodes differ in each run of job but receiver
node remains same(same machines) throughout life of streaming application
unless node failure happens?



On Tue, May 19, 2015 at 9:29 PM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
> Level Consumer API.
>
> http://spark-packages.org/package/dibbhatt/kafka-spark-consumer
>
>
> Regards,
> Dibyendu
>
> On Tue, May 19, 2015 at 9:00 PM, Akhil Das 
> wrote:
>
>>
>> On Tue, May 19, 2015 at 8:10 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> So for Kafka+spark streaming, Receiver based streaming used highlevel
>>> api and non receiver based streaming used low level api.
>>>
>>> 1.In high level receiver based streaming does it registers consumers at
>>> each job start(whenever a new job is launched by streaming application say
>>> at each second)?
>>>
>>
>> ​-> Receiver based streaming will always have the receiver running
>> parallel while your job is running, So by default for every 200ms
>> (spark.streaming.blockInterval) the receiver will generate a block of data
>> which is read from Kafka.
>> ​
>>
>>
>>> 2.No of executors in highlevel receiver based jobs will always equal to
>>> no of partitions in topic ?
>>>
>>
>> ​-> Not sure from where did you came up with this. For the non stream
>> based one, i think the number of partitions in spark will be equal to the
>> number of kafka partitions for the given topic.
>> ​
>>
>>
>>> 3.Will data from a single topic be consumed by executors in parllel or
>>> only one receiver consumes in multiple threads and assign to executors in
>>> high level receiver based approach ?
>>>
>>> ​-> They will consume the data parallel.​ For the receiver based
>> approach, you can actually specify the number of receiver that you want to
>> spawn for consuming the messages.
>>
>>>
>>>
>>>
>>> On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
>>> wrote:
>>>
 spark.streaming.concurrentJobs takes an integer value, not boolean. If
 you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
 next job will start once it completes the current one.


> Actually, in the current implementation of Spark Streaming and under
> default configuration, only job is active (i.e. under execution) at any
> point of time. So if one batch's processing takes longer than 10 seconds,
> then then next batch's jobs will stay queued.
> This can be changed with an experimental Spark property
> "spark.streaming.concurrentJobs" which is by default set to 1. Its not
> currently documented (maybe I should add it).
> The reason it is set to 1 is that concurrent jobs can potentially lead
> to weird sharing of resources and which can make it hard to debug the
> whether there is sufficient resources in the system to process the 
> ingested
> data fast enough. With only 1 job running at a time, it is easy to see 
> that
> if batch processing time < batch interval, then the system will be stable.
> Granted that this may not be the most efficient use of resources under
> certain conditions. We definitely hope to improve this in the future.


 Copied from TD's answer written in SO
 
 .

 Non-receiver based streaming for example you can say are the
 fileStream, directStream ones. You can read a bit of information from here
 https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 2:13 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> Thanks Akhil.
> When I don't  set spark.streaming.concurrentJobs to true. Will the
> all pending jobs starts one by one after 1 jobs completes,or it does not
> creates jobs which could not be started at its desired interval.
>
> And Whats the difference and usage of Receiver vs non-receiver based
> streaming. Is there any documentation for that?
>
> On Tue, May 19, 2015 at 1:35 PM, Akhil Das  > wrote:
>
>> It will be a single job running at a time by default (you can also
>> configure the spark.streaming.concurrentJobs to run jobs parallel which 
>> is
>> not recommended to put in production).
>>
>> Now, your batch duration being 1 sec and processing time being 2
>> minutes, if you are using a receiver based streaming then ideally those
>> receivers will keep on receiving data while the job is running (which 
>> will
>> accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
>> block not found exceptions as spark drops som

Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I'm using fine-grained for a multi-tenant environment which is why I would
welcome the limit of tasks per job :)

cheers,
Tom

On Tue, May 19, 2015 at 10:05 AM, Matei Zaharia 
wrote:

> Hey Tom,
>
> Are you using the fine-grained or coarse-grained scheduler? For the
> coarse-grained scheduler, there is a spark.cores.max config setting that
> will limit the total # of cores it grabs. This was there in earlier
> versions too.
>
> Matei
>
> > On May 19, 2015, at 12:39 PM, Thomas Dudziak  wrote:
> >
> > I read the other day that there will be a fair number of improvements in
> 1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
> configurable limit for the number of tasks for jobs run on Mesos ? This
> would be a very simple yet effective way to prevent a job dominating the
> cluster.
> >
> > cheers,
> > Tom
> >
>
>


PanTera Big Data Visualization built with Spark

2015-05-19 Thread Cyrus Handy
Hi,

Can you please add us to the list of Spark Users

Org: PanTera
URL: http://pantera.io
Components we are using:

   - PanTera uses a direct access to the Spark Scala API
   - Spark Core ­ SparkContext, JavaSparkContext, SparkConf, RDD, JavaRDD,
   - Accumulable, AccumulableParam, Accumulator, AccumulatorParam,
   - StorageLevel, Broadcast, HashPartitioner, Logging, KryoRegistrator,
   - NewHadoopRDD, UnionRDD
   - We also use SharedSparkContext for testing, but we've made a copy
   - SparkSQL ­ SQLContext, JavaSQLContext, ByteType, IntType, LongType,
   - FloatType, DoubleType, BooleanType, StringType, ArrayType,
   - TimestampType, StructField, StructType, Row, GenericRow
   - Spark Streaming ­ We have done so in the past, and will again in the
   future,
   - but not currently supported (when used: StreamingContext, DStream,
   Time)
   - GraphX ­ Graph, PartitionStrategy, Edge, EdgeTriplet, VertexRDD,
   - graphToGraphOps

Use Case:

PanTera is a tool for exploring large datasets. It uses Spark to create XY
and geographic scatterplots from millions to billions of datapoints.

Please let me know if you have any questions.

Thanks very much,

Cyrus Handy
PanTera  Product Manager
Uncharted™  (Formerly Oculus Info Inc.)
Direct: 416-203-3003 x232
Mobile: 416-821-3025


Re: Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Matei Zaharia
Hey Tom,

Are you using the fine-grained or coarse-grained scheduler? For the 
coarse-grained scheduler, there is a spark.cores.max config setting that will 
limit the total # of cores it grabs. This was there in earlier versions too.

Matei

> On May 19, 2015, at 12:39 PM, Thomas Dudziak  wrote:
> 
> I read the other day that there will be a fair number of improvements in 1.4 
> for Mesos. Could I ask for one more (if it isn't already in there): a 
> configurable limit for the number of tasks for jobs run on Mesos ? This would 
> be a very simple yet effective way to prevent a job dominating the cluster.
> 
> cheers,
> Tom
> 


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Wish for 1.4: upper bound on # tasks in Mesos

2015-05-19 Thread Thomas Dudziak
I read the other day that there will be a fair number of improvements in
1.4 for Mesos. Could I ask for one more (if it isn't already in there): a
configurable limit for the number of tasks for jobs run on Mesos ? This
would be a very simple yet effective way to prevent a job dominating the
cluster.

cheers,
Tom


Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread N B
Thanks Imran. It does help clarify. I believe I had it right all along then
but was confused by documentation talking about never changing the
broadcasted variables.

I've tried it on a local mode process till now and does seem to work as
intended. When (and if !) we start running on a real cluster, I hope this
holds up.

Thanks
NB


On Tue, May 19, 2015 at 6:25 AM, Imran Rashid  wrote:

> hmm, I guess it depends on the way you look at it.  In a way, I'm saying
> that spark does *not* have any built in "auto-re-broadcast" if you try to
> mutate a broadcast variable.  Instead, you should create something new, and
> just broadcast it separately.  Then just have all the code you have
> operating on your RDDs look at the new broadcast variable.
>
> But I guess there is another way to look at it -- you are creating new
> broadcast variables each time, but they all point to the same underlying
> mutable data structure.  So in a way, you are "rebroadcasting" the same
> underlying data structure.
>
> Let me expand my example from earlier a little bit more:
>
>
> def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
>  ...
> }
>
> // this is a val, because the data structure itself is mutable
> val myMutableDataStructue = ...
> // this is a var, because you will create new broadcasts
> var myBroadcast = sc.broadcast(myMutableDataStructure)
> (0 to 20).foreach { iteration =>
>   oneIteration(myRDD, myBroadcast)
>   // update your mutable data structure in place
>   myMutableDataStructure.update(...)
>   // ... but that doesn't effect the broadcast variables living out on the
> cluster, so we need to
>   // create a new one
>
>   // this line is not required -- the broadcast var will automatically get
> unpersisted when a gc
>   // cleans up the old broadcast on the driver, but I'm including this
> here for completeness,
>   // in case you want to more proactively clean up old blocks if you are
> low on space
>   myBroadcast.unpersist()
>
>   // now we create a new broadcast which has the updated data in our
> mutable data structure
>   myBroadcast = sc.broadcast(myMutableDataStructure)
> }
>
>
> hope this clarifies things!
>
> Imran
>
> On Tue, May 19, 2015 at 3:06 AM, N B  wrote:
>
>> Hi Imran,
>>
>> If I understood you correctly, you are suggesting to simply call
>> broadcast again from the driver program. This is exactly what I am hoping
>> will work as I have the Broadcast data wrapped up and I am indeed
>> (re)broadcasting the wrapper over again when the underlying data changes.
>> However, documentation seems to suggest that one cannot re-broadcast. Is my
>> understanding accurate?
>>
>> Thanks
>> NB
>>
>>
>> On Mon, May 18, 2015 at 6:24 PM, Imran Rashid 
>> wrote:
>>
>>> Rather than "updating" the broadcast variable, can't you simply create a
>>> new one?  When the old one can be gc'ed in your program, it will also get
>>> gc'ed from spark's cache (and all executors).
>>>
>>> I think this will make your code *slightly* more complicated, as you
>>> need to add in another layer of indirection for which broadcast variable to
>>> use, but not too bad.  Eg., from
>>>
>>> var myBroadcast = sc.broadcast( ...)
>>> (0 to 20).foreach{ iteration =>
>>>   //  ... some rdd operations that involve myBroadcast ...
>>>   myBroadcast.update(...) // wrong! dont' update a broadcast variable
>>> }
>>>
>>> instead do something like:
>>>
>>> def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit
>>> = {
>>>  ...
>>> }
>>>
>>> var myBroadcast = sc.broadcast(...)
>>> (0 to 20).foreach { iteration =>
>>>   oneIteration(myRDD, myBroadcast)
>>>   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
>>> with whatever you need to update it
>>> }
>>>
>>> On Sat, May 16, 2015 at 2:01 AM, N B  wrote:
>>>
 Thanks Ayan. Can we rebroadcast after updating in the driver?

 Thanks
 NB.


 On Fri, May 15, 2015 at 6:40 PM, ayan guha  wrote:

> Hi
>
> broadcast variables are shipped for the first time it is accessed in a
> transformation to the executors used by the transformation. It will NOT
> updated subsequently, even if the value has changed. However, a new value
> will be shipped to any new executor comes into play after the value has
> changed. This way, changing value of broadcast variable is not a good idea
> as it can create inconsistency within cluster. From documentatins:
>
>  In addition, the object v should not be modified after it is
> broadcast in order to ensure that all nodes get the same value of the
> broadcast variable
>
>
> On Sat, May 16, 2015 at 10:39 AM, N B  wrote:
>
>> Thanks Ilya. Does one have to call broadcast again once the
>> underlying data is updated in order to get the changes visible on all 
>> nodes?
>>
>> Thanks
>> NB
>>
>>
>> On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin 
>> wrote:
>>
>>> The broadcast variable is li

Re: spark streaming doubt

2015-05-19 Thread Dibyendu Bhattacharya
Just to add, there is a Receiver based Kafka consumer which uses Kafka Low
Level Consumer API.

http://spark-packages.org/package/dibbhatt/kafka-spark-consumer


Regards,
Dibyendu

On Tue, May 19, 2015 at 9:00 PM, Akhil Das 
wrote:

>
> On Tue, May 19, 2015 at 8:10 PM, Shushant Arora  > wrote:
>
>> So for Kafka+spark streaming, Receiver based streaming used highlevel api
>> and non receiver based streaming used low level api.
>>
>> 1.In high level receiver based streaming does it registers consumers at
>> each job start(whenever a new job is launched by streaming application say
>> at each second)?
>>
>
> ​-> Receiver based streaming will always have the receiver running
> parallel while your job is running, So by default for every 200ms
> (spark.streaming.blockInterval) the receiver will generate a block of data
> which is read from Kafka.
> ​
>
>
>> 2.No of executors in highlevel receiver based jobs will always equal to
>> no of partitions in topic ?
>>
>
> ​-> Not sure from where did you came up with this. For the non stream
> based one, i think the number of partitions in spark will be equal to the
> number of kafka partitions for the given topic.
> ​
>
>
>> 3.Will data from a single topic be consumed by executors in parllel or
>> only one receiver consumes in multiple threads and assign to executors in
>> high level receiver based approach ?
>>
>> ​-> They will consume the data parallel.​ For the receiver based
> approach, you can actually specify the number of receiver that you want to
> spawn for consuming the messages.
>
>>
>>
>>
>> On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
>> wrote:
>>
>>> spark.streaming.concurrentJobs takes an integer value, not boolean. If
>>> you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
>>> next job will start once it completes the current one.
>>>
>>>
 Actually, in the current implementation of Spark Streaming and under
 default configuration, only job is active (i.e. under execution) at any
 point of time. So if one batch's processing takes longer than 10 seconds,
 then then next batch's jobs will stay queued.
 This can be changed with an experimental Spark property
 "spark.streaming.concurrentJobs" which is by default set to 1. Its not
 currently documented (maybe I should add it).
 The reason it is set to 1 is that concurrent jobs can potentially lead
 to weird sharing of resources and which can make it hard to debug the
 whether there is sufficient resources in the system to process the ingested
 data fast enough. With only 1 job running at a time, it is easy to see that
 if batch processing time < batch interval, then the system will be stable.
 Granted that this may not be the most efficient use of resources under
 certain conditions. We definitely hope to improve this in the future.
>>>
>>>
>>> Copied from TD's answer written in SO
>>> 
>>> .
>>>
>>> Non-receiver based streaming for example you can say are the fileStream,
>>> directStream ones. You can read a bit of information from here
>>> https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, May 19, 2015 at 2:13 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 Thanks Akhil.
 When I don't  set spark.streaming.concurrentJobs to true. Will the all
 pending jobs starts one by one after 1 jobs completes,or it does not
 creates jobs which could not be started at its desired interval.

 And Whats the difference and usage of Receiver vs non-receiver based
 streaming. Is there any documentation for that?

 On Tue, May 19, 2015 at 1:35 PM, Akhil Das 
 wrote:

> It will be a single job running at a time by default (you can also
> configure the spark.streaming.concurrentJobs to run jobs parallel which is
> not recommended to put in production).
>
> Now, your batch duration being 1 sec and processing time being 2
> minutes, if you are using a receiver based streaming then ideally those
> receivers will keep on receiving data while the job is running (which will
> accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
> block not found exceptions as spark drops some blocks which are yet to
> process to accumulate new blocks). If you are using a non-receiver based
> approach, you will not have this problem of dropping blocks.
>
> Ideally, if your data is small and you have enough memory to hold your
> data then it will run smoothly without any issues.
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 1:23 PM, Shushant Arora <
> shushantaror...@gmail.com> wrote:
>
>> What happnes if in a streaming application one job is not yet
>> finished and stream interval reaches. Does it starts next job or wait for
>> first 

Re: Decision tree: categorical variables

2015-05-19 Thread Ram Sriharsha
Hi Keerthi

As Xiangrui mentioned in the reply, the categorical variables are assumed
to be encoded as integers between 0 and k - 1, if k is the parameter you
are passing as the category info map. So you will need to handle this
during parsing (your columns 3 and 6 need to be converted into ints in the
right range)

Ram

On Tue, May 19, 2015 at 5:45 AM, Keerthi 
wrote:

> Hi ,
>
> can you pls share how you resolved the parsing issue. It would be of great
> help...
>
> Thanks.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-categorical-variables-tp12433p22943.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Mesos Spark Tasks - Lost

2015-05-19 Thread Panagiotis Garefalakis
Hello all,

I am facing a weird issue for the last couple of days running Spark on top
of Mesos and I need your help. I am running Mesos in a private cluster and
managed to deploy successfully  hdfs, cassandra, marathon and play but
Spark is not working for a reason. I have tried so far:
different java versions (1.6 and 1.7 oracle and openjdk), different
spark-env configuration, different Spark versions (from 0.8.8 to 1.3.1),
different HDFS versions (hadoop 5.1 and 4.6), and updating pom dependencies.

More specifically while local tasks complete fine, in cluster mode all the
tasks get lost.
(both using spark-shell and spark-submit)
>From the worker log I see something like this:

---
I0519 02:36:30.475064 12863 fetcher.cpp:214] Fetching URI
'hdfs:/:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
I0519 02:36:30.747372 12863 fetcher.cpp:99] Fetching URI
'hdfs://X:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' using Hadoop
Client
I0519 02:36:30.747546 12863 fetcher.cpp:109] Downloading resource from
'hdfs://:8020/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz' to
'/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
I0519 02:36:34.205878 12863 fetcher.cpp:78] Extracted resource
'/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3/spark-1.1.0-bin-2.0.0-cdh4.7.0.tgz'
into
'/tmp/mesos/slaves/20150515-164602-2877535122-5050-32131-S2/frameworks/20150517-162701-2877535122-5050-28705-0084/executors/20150515-164602-2877535122-5050-32131-S2/runs/660d78ec-e2f4-4d38-881b-7209cbd3c5c3'
*Error: Could not find or load main class two*

---

And from the Spark Terminal:

---
15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Cancelling stage 0
15/05/19 02:36:39 INFO scheduler.TaskSchedulerImpl: Stage 0 was cancelled
15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at
SparkPi.scala:35
15/05/19 02:36:39 INFO scheduler.DAGScheduler: Failed to run reduce at
SparkPi.scala:35
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 7 in stage 0.0 failed 4 times, most recent failure:
Lost task 7.3 in stage 0.0 (TID 26, ): ExecutorLostFailure
(executor lost)
Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)atorg.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
..
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

---

Any help will be greatly appreciated!

Regards,
Panagiotis


Windows DOS bug in windows-utils.cmd

2015-05-19 Thread Justin Pihony
When running something like this:

spark-shell --jars foo.jar,bar.jar

This keeps failing to include the tail of the jars list. Digging into the
launch scripts I found that the comma makes it so that the list was sent as
separate parameters. So, to keep things together, I tried 

spark-shell --jars "foo.jar, bar.jar"

But, this still failed as the quotes carried over into some of the string
checks and resulted in invalid character errors. So, I am curious if anybody
sees a problem with making a PR to fix the script from

...
if "x%2"=="x" (
  echo "%1" requires an argument. >&2
  exit /b 1
)
set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %2
...

TO

...
if "x%~2"=="x" (
  echo "%1" requires an argument. >&2
  exit /b 1
)
set SUBMISSION_OPTS=%SUBMISSION_OPTS% %1 %~2
...

The only difference is the use of the tilde to remove any surrounding quotes
if there are some. 

I figured I would ask here first to vet any unforeseen bugs this might cause
in other systems. As far as I know this should be harmless and only make it
so that comma separated lists will work in DOS.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Windows-DOS-bug-in-windows-utils-cmd-tp22946.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming doubt

2015-05-19 Thread Akhil Das
On Tue, May 19, 2015 at 8:10 PM, Shushant Arora 
wrote:

> So for Kafka+spark streaming, Receiver based streaming used highlevel api
> and non receiver based streaming used low level api.
>
> 1.In high level receiver based streaming does it registers consumers at
> each job start(whenever a new job is launched by streaming application say
> at each second)?
>

​-> Receiver based streaming will always have the receiver running parallel
while your job is running, So by default for every 200ms
(spark.streaming.blockInterval) the receiver will generate a block of data
which is read from Kafka.
​


> 2.No of executors in highlevel receiver based jobs will always equal to no
> of partitions in topic ?
>

​-> Not sure from where did you came up with this. For the non stream based
one, i think the number of partitions in spark will be equal to the number
of kafka partitions for the given topic.
​


> 3.Will data from a single topic be consumed by executors in parllel or
> only one receiver consumes in multiple threads and assign to executors in
> high level receiver based approach ?
>
> ​-> They will consume the data parallel.​ For the receiver based approach,
you can actually specify the number of receiver that you want to spawn for
consuming the messages.

>
>
>
> On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
> wrote:
>
>> spark.streaming.concurrentJobs takes an integer value, not boolean. If
>> you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
>> next job will start once it completes the current one.
>>
>>
>>> Actually, in the current implementation of Spark Streaming and under
>>> default configuration, only job is active (i.e. under execution) at any
>>> point of time. So if one batch's processing takes longer than 10 seconds,
>>> then then next batch's jobs will stay queued.
>>> This can be changed with an experimental Spark property
>>> "spark.streaming.concurrentJobs" which is by default set to 1. Its not
>>> currently documented (maybe I should add it).
>>> The reason it is set to 1 is that concurrent jobs can potentially lead
>>> to weird sharing of resources and which can make it hard to debug the
>>> whether there is sufficient resources in the system to process the ingested
>>> data fast enough. With only 1 job running at a time, it is easy to see that
>>> if batch processing time < batch interval, then the system will be stable.
>>> Granted that this may not be the most efficient use of resources under
>>> certain conditions. We definitely hope to improve this in the future.
>>
>>
>> Copied from TD's answer written in SO
>> 
>> .
>>
>> Non-receiver based streaming for example you can say are the fileStream,
>> directStream ones. You can read a bit of information from here
>> https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html
>>
>> Thanks
>> Best Regards
>>
>> On Tue, May 19, 2015 at 2:13 PM, Shushant Arora <
>> shushantaror...@gmail.com> wrote:
>>
>>> Thanks Akhil.
>>> When I don't  set spark.streaming.concurrentJobs to true. Will the all
>>> pending jobs starts one by one after 1 jobs completes,or it does not
>>> creates jobs which could not be started at its desired interval.
>>>
>>> And Whats the difference and usage of Receiver vs non-receiver based
>>> streaming. Is there any documentation for that?
>>>
>>> On Tue, May 19, 2015 at 1:35 PM, Akhil Das 
>>> wrote:
>>>
 It will be a single job running at a time by default (you can also
 configure the spark.streaming.concurrentJobs to run jobs parallel which is
 not recommended to put in production).

 Now, your batch duration being 1 sec and processing time being 2
 minutes, if you are using a receiver based streaming then ideally those
 receivers will keep on receiving data while the job is running (which will
 accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
 block not found exceptions as spark drops some blocks which are yet to
 process to accumulate new blocks). If you are using a non-receiver based
 approach, you will not have this problem of dropping blocks.

 Ideally, if your data is small and you have enough memory to hold your
 data then it will run smoothly without any issues.

 Thanks
 Best Regards

 On Tue, May 19, 2015 at 1:23 PM, Shushant Arora <
 shushantaror...@gmail.com> wrote:

> What happnes if in a streaming application one job is not yet finished
> and stream interval reaches. Does it starts next job or wait for first to
> finish and rest jobs will keep on accumulating in queue.
>
>
> Say I have a streaming application with stream interval of 1 sec, but
> my job takes 2 min to process 1 sec stream , what will happen ?  At any
> time there will be only one job running or multiple ?
>
>

>>>
>>
>


Multi user setup and saving a DataFrame / RDD to a network exported file system

2015-05-19 Thread Tomasz Fruboes

Dear Experts,

 we have a spark cluster (standalone mode) in which master and workers 
are started from root account. Everything runs correctly to the point 
when we try doing operations such as


dataFrame.select("name", "age").save(ofile, "parquet")

or

rdd.saveAsPickleFile(ofile)

, where ofile is path on a network exported filesystem (visible on all 
nodes, in our case this is lustre, I guess on nfs effect would be similar).


 Unsurprisingly temp files created on workers are owned by root, which 
then leads to a crash (see [1] below). Is there a solution/workaround 
for this (e.g. controlling file creation mode of the temporary files)?


Cheers,
 Tomasz


ps I've tried to google this problem, couple of similar reports, but no 
clear answer/solution found


ps2 For completeness - running master/workers as a regular user solves 
the problem only for the given user. For other users submitting to this 
master the result is given in [2] below



[0] Cluster details:
Master/workers: centos 6.5
Spark 1.3.1 prebuilt for hadoop 2.4 (same behaviour for the 2.6 build)


[1]
##
   File 
"/mnt/home/tfruboes/2015.05.SparkLocal/spark-1.3.1-bin-hadoop2.4/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", 
line 300, in get_return_value

py4j.protocol.Py4JJavaError: An error occurred while calling o27.save.
: java.io.IOException: Failed to rename 
DeprecatedRawLocalFileStatus{path=file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/_temporary/0/task_201505191540_0009_r_01/part-r-2.parquet; 
isDirectory=false; length=534; replication=1; blocksize=33554432; 
modification_time=1432042832000; access_time=0; owner=; group=; 
permission=rw-rw-rw-; isSymlink=false} to 
file:/mnt/lustre/bigdata/med_home/tmp/test19EE/namesAndAges.parquet2/part-r-2.parquet
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:346)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:362)
at 
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:310)
at 
parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:43)
at 
org.apache.spark.sql.parquet.ParquetRelation2.insert(newParquet.scala:690)
at 
org.apache.spark.sql.parquet.DefaultSource.createRelation(newParquet.scala:129)
at 
org.apache.spark.sql.sources.ResolvedDataSource$.apply(ddl.scala:240)

at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1196)
at org.apache.spark.sql.DataFrame.save(DataFrame.scala:1181)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:606)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at 
py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)

at py4j.Gateway.invoke(Gateway.java:259)
at 
py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)

at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
##



[2]
##
15/05/19 14:45:19 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 
3, wn23023.cis.gov.pl): java.io.IOException: Mkdirs failed to create 
file:/mnt/lustre/bigdata/med_home/tmp/test18/namesAndAges.parquet2/_temporary/0/_temporary/attempt_201505191445_0009_r_00_0
	at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:438)
	at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:424)

at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:906)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:887)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:784)
at parquet.hadoop.ParquetFileWriter.(ParquetFileWriter.java:154)
	at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:279)
	at 
parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
	at 
org.apache.spark.sql.parquet.ParquetRelation2.org$apache$spark$sql$parquet$ParquetRelation2$$writeShard$1(newParquet.scala:667)
	at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)
	at 
org.apache.spark.sql.parquet.ParquetRelation2$$anonfun$insert$2.apply(newParquet.scala:689)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at org.apache

Re: PySpark Job throwing IOError

2015-05-19 Thread Muralidhar, Nikhil
Hello all,
  I have an error in pyspark for which I have not the faintest idea of the 
cause. All I can tell from the stack trace is that it can't find a pyspark file 
on the path /mnt/spark-*/pyspark-*. Apart from that I need someone more  
experienced than me with Spark to look into it and help diagnose the problem 
and suggest potential solutions, hence I am looking to this group for help.


If anyone wants to read the same question on Stack Overflow here is the link:  
http://stackoverflow.com/questions/30328104/pyspark-job-throwing-ioerror

Here's the same thing pasted as raw text:


I am trying to write a simple KNN job using pyspark on a hdfs cluster. I am 
using very few input files to perform the job so I don't think it's a memory 
(space). I do not do a broadcast in any part of my code. So it is surprising to 
me when the broadcast.py fails? I however do have python dictionaries that I 
have in shared memory without explicitly doing a broadcast.

Can anyone help me understand what is going on?

I have appended my python file and the stack trace to this email.


Thanks,

Nikhil

from pyspark.mllib.linalg import SparseVector
from pyspark import SparkContext
import glob
import sys
import time
import subprocess
from itertools import combinations
"""We create user and item indices starting from 0 to #users and 0 to #items respectively. This is done to store them in sparseVectors as dicts."""
def create_indices(inputdir):
items=dict()
user_id_to_idx=dict()
user_idx_to_id=dict()
item_idx_to_id=dict()
item_id_to_idx=dict()
item_idx=0
user_idx=0

cat=subprocess.Popen(["hadoop","fs","-cat","/user/hadoop/"+inputdir+"/*.txt"],stdout=subprocess.PIPE)
for line in cat.stdout:
toks=map(str,line.strip().split("\t"))
try:
user_id_to_idx[toks[1].strip()]
except KeyError:
if toks[1].strip()!=None:
user_id_to_idx[toks[1].strip()]=user_idx
user_idx_to_id[user_idx]=toks[1].strip()
user_idx+=1
try:
item_id_to_idx[toks[0].strip()]
except KeyError:
if toks[0].strip()!=None:
item_id_to_idx[toks[0].strip()]=item_idx
item_idx_to_id[item_idx]=toks[0].strip()
item_idx+=1
return user_idx_to_id,user_id_to_idx,item_idx_to_id,item_id_to_idx,user_idx,item_idx

def concat_helper(a,b):
if(a!= None and b!=None):
print a,b,a.update(b)
temp=dict()
temp.update(a)
temp.update(b)
return temp
elif a!=None:
return a
elif b!=None:
return b

# pass in the hdfs path to the input files and the spark context.
def runKNN(inputdir,sc,user_id_to_idx,item_id_to_idx):
rdd_text=sc.textFile(inputdir)
try:
new_rdd = rdd_text.map(lambda x: (item_id_to_idx[str(x.strip().split("\t")[0])],{user_id_to_idx[str(x.strip().split("\t")[1])]:1})).reduceByKey(lambda x,y: concat_helper(x,y)).sortByKey()
except KeyError:
print item_id_to_idx.keys()
pass
return new_rdd

if __name__=="__main__":
sc = SparkContext()
u_idx_to_id,u_id_to_idx,i_idx_to_id,i_id_to_idx,u_idx,i_idx=create_indices(sys.argv[1])

u_idx_to_id_b=sc.broadcast(u_idx_to_id)
u_id_to_idx_b=sc.broadcast(u_id_to_idx)
i_idx_to_idx_b=sc.broadcast(i_idx_to_id)
i_id_to_idx_b=sc.broadcast(i_id_to_idx)
num_users=sc.broadcast(u_idx)
num_items=sc.broadcast(i_idx)
item_dict_rdd=runKNN(sys.argv[1],sc,u_id_to_idx,i_id_to_idx)

item_dict_rdd_new=item_dict_rdd.map(lambda x: (x[0],SparseVector(i_idx,x[1])))
item_dict_rdd_new.saveAsTextFile("hdfs://output_path")
#dot_products_rdd=map(lambda (x,y): (x,y),combinations(item_dict_rdd_new.map(lambda x: x),2))
dot_products_rdd.saveAsTextFile("hdfs://output_path_2")

stacktrace
Description: stacktrace

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Does Python 2.7 have to be installed on every cluster node?

2015-05-19 Thread YaoPau
We're running Python 2.6.6 here but we're looking to upgrade to 2.7.x  in a
month.  

Does pyspark work by converting Python into Java Bytecode, or does it run
Python natively?  

And along those lines, if we're running in yarn-client mode, would we have
to upgrade just the edge node version of Python, or every node in the
cluster?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Does-Python-2-7-have-to-be-installed-on-every-cluster-node-tp22945.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark streaming doubt

2015-05-19 Thread Shushant Arora
So for Kafka+spark streaming, Receiver based streaming used highlevel api
and non receiver based streaming used low level api.

1.In high level receiver based streaming does it registers consumers at
each job start(whenever a new job is launched by streaming application say
at each second)?
2.No of executors in highlevel receiver based jobs will always equal to no
of partitions in topic ?
3.Will data from a single topic be consumed by executors in parllel or only
one receiver consumes in multiple threads and assign to executors in high
level receiver based approach ?




On Tue, May 19, 2015 at 2:38 PM, Akhil Das 
wrote:

> spark.streaming.concurrentJobs takes an integer value, not boolean. If
> you set it as 2 then 2 jobs will run parallel. Default value is 1 and the
> next job will start once it completes the current one.
>
>
>> Actually, in the current implementation of Spark Streaming and under
>> default configuration, only job is active (i.e. under execution) at any
>> point of time. So if one batch's processing takes longer than 10 seconds,
>> then then next batch's jobs will stay queued.
>> This can be changed with an experimental Spark property
>> "spark.streaming.concurrentJobs" which is by default set to 1. Its not
>> currently documented (maybe I should add it).
>> The reason it is set to 1 is that concurrent jobs can potentially lead to
>> weird sharing of resources and which can make it hard to debug the whether
>> there is sufficient resources in the system to process the ingested data
>> fast enough. With only 1 job running at a time, it is easy to see that if
>> batch processing time < batch interval, then the system will be stable.
>> Granted that this may not be the most efficient use of resources under
>> certain conditions. We definitely hope to improve this in the future.
>
>
> Copied from TD's answer written in SO
> 
> .
>
> Non-receiver based streaming for example you can say are the fileStream,
> directStream ones. You can read a bit of information from here
> https://spark.apache.org/docs/1.3.1/streaming-kafka-integration.html
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 2:13 PM, Shushant Arora  > wrote:
>
>> Thanks Akhil.
>> When I don't  set spark.streaming.concurrentJobs to true. Will the all
>> pending jobs starts one by one after 1 jobs completes,or it does not
>> creates jobs which could not be started at its desired interval.
>>
>> And Whats the difference and usage of Receiver vs non-receiver based
>> streaming. Is there any documentation for that?
>>
>> On Tue, May 19, 2015 at 1:35 PM, Akhil Das 
>> wrote:
>>
>>> It will be a single job running at a time by default (you can also
>>> configure the spark.streaming.concurrentJobs to run jobs parallel which is
>>> not recommended to put in production).
>>>
>>> Now, your batch duration being 1 sec and processing time being 2
>>> minutes, if you are using a receiver based streaming then ideally those
>>> receivers will keep on receiving data while the job is running (which will
>>> accumulate in memory if you set StorageLevel as MEMORY_ONLY and end up in
>>> block not found exceptions as spark drops some blocks which are yet to
>>> process to accumulate new blocks). If you are using a non-receiver based
>>> approach, you will not have this problem of dropping blocks.
>>>
>>> Ideally, if your data is small and you have enough memory to hold your
>>> data then it will run smoothly without any issues.
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Tue, May 19, 2015 at 1:23 PM, Shushant Arora <
>>> shushantaror...@gmail.com> wrote:
>>>
 What happnes if in a streaming application one job is not yet finished
 and stream interval reaches. Does it starts next job or wait for first to
 finish and rest jobs will keep on accumulating in queue.


 Say I have a streaming application with stream interval of 1 sec, but
 my job takes 2 min to process 1 sec stream , what will happen ?  At any
 time there will be only one job running or multiple ?


>>>
>>
>


Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Another update, when run on more that 1000 columns I am getting

Could not write class
__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d/__wrapper$1$40255d281a0d4eacab06bcad6cf89b0d$$anonfun$wrapper$1$$anon$1
because it exceeds JVM code size limits. Method apply's code too large!






Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 6:23 PM, madhu phatak  wrote:

> Hi,
> Tested with HiveContext also. It also take similar amount of time.
>
> To make the things clear, the following is select clause for a given column
>
>
> *aggregateStats( "$columnName" , max( cast($columnName as double)),   
> |min(cast($columnName as double)), avg(cast($columnName as double)), count(*) 
> )*
>
> aggregateStats is UDF generating case class to hold the values.
>
>
>
>
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Tue, May 19, 2015 at 5:57 PM, madhu phatak 
> wrote:
>
>> Hi,
>> Tested for calculating values for 300 columns. Analyser takes around 4
>> minutes to generate the plan. Is this normal?
>>
>>
>>
>>
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>> On Tue, May 19, 2015 at 4:35 PM, madhu phatak 
>> wrote:
>>
>>> Hi,
>>> I am using spark 1.3.1
>>>
>>>
>>>
>>>
>>> Regards,
>>> Madhukara Phatak
>>> http://datamantra.io/
>>>
>>> On Tue, May 19, 2015 at 4:34 PM, Wangfei (X) 
>>> wrote:
>>>
  And which version are you using

 发自我的 iPhone

 在 2015年5月19日,18:29,"ayan guha"  写道:

   can you kindly share your code?

 On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
 wrote:

> Hi,
> I  am trying run spark sql aggregation on a file with 26k columns. No
> of rows is very small. I am running into issue that spark is taking huge
> amount of time to parse the sql and create a logical plan. Even if i have
> just one row, it's taking more than 1 hour just to get pass the parsing.
> Any idea how to optimize in these kind of scenarios?
>
>
>  Regards,
>  Madhukara Phatak
> http://datamantra.io/
>



  --
 Best Regards,
 Ayan Guha


>>>
>>
>


Re: How to use spark to access HBase with Security enabled

2015-05-19 Thread Ted Yu
Please take a look at:
http://hbase.apache.org/book.html#_client_side_configuration_for_secure_operation

Cheers

On Tue, May 19, 2015 at 5:23 AM, donhoff_h <165612...@qq.com> wrote:

>
> The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my
> spark programs. I am sure I have run the kinit command to make it take
> effect. And I also used the HBase Shell to verify that this user has the
> right to scan and put the tables in HBase.
>
> Now I still have no idea how to solve this problem. Can anybody help me to
> figure it out? Many Thanks!
>
> -- 原始邮件 --
> *发件人:* "yuzhihong";;
> *发送时间:* 2015年5月19日(星期二) 晚上7:55
> *收件人:* "donhoff_h"<165612...@qq.com>;
> *抄送:* "user";
> *主题:* Re: How to use spark to access HBase with Security enabled
>
> Which user did you run your program as ?
>
> Have you granted proper permission on hbase side ?
>
> You should also check master log to see if there was some clue.
>
> Cheers
>
>
>
> On May 19, 2015, at 2:41 AM, donhoff_h <165612...@qq.com> wrote:
>
> Hi, experts.
>
> I ran the "HBaseTest" program which is an example from the Apache Spark
> source code to learn how to use spark to access HBase. But I met the
> following exception:
> Exception in thread "main"
> org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after
> attempts=36, exceptions:
> Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException:
> callTimeout=6, callDuration=68648: row 'spark_t01,,00' on
> table 'hbase:meta' at region=hbase:meta,,1.1588230740,
> hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0
>
> I also checked the RegionServer Log of the host "bgdt01.dev.hrb" listed in
> the above exception. I found a few entries like the following one:
> 2015-05-19 16:59:11,143 DEBUG
> [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer:
> RpcServer.listener,port=16020: Caught exception while
> reading:Authentication is required
>
> The above entry did not point to my program clearly. But the time is very
> near. Since my hbase version is HBase1.0.0 and I set security enabled, I
> doubt the exception was caused by the Kerberos authentication.  But I am
> not sure.
>
> Do anybody know if my guess is right? And if I am right, could anybody
> tell me how to set Kerberos Authentication in a spark program? I don't know
> how to do it. I already checked the API doc , but did not found any API
> useful. Many Thanks!
>
> By the way, my spark version is 1.3.0. I also paste the code of
> "HBaseTest" in the following:
> ***Source Code**
> object HBaseTest {
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("HBaseTest")
> val sc = new SparkContext(sparkConf)
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE, args(0))
>
> // Initialize hBase table if necessary
> val admin = new HBaseAdmin(conf)
> if (!admin.isTableAvailable(args(0))) {
>   val tableDesc = new HTableDescriptor(args(0))
>   admin.createTable(tableDesc)
> }
>
> val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>   classOf[org.apache.hadoop.hbase.client.Result])
>
> hBaseRDD.count()
>
> sc.stop()
>   }
> }
>
>


Re: org.apache.spark.shuffle.FetchFailedException :: Migration from Spark 1.2 to 1.3

2015-05-19 Thread Imran Rashid
The error you are posting is from attempt 6 for that stage ("stage 1.6").
I've found those files can get corrupted when a stage gets retried and
there are multiple attempts.  Hopefully that will get fixed soon
(SPARK-7308), but in the meantime, I'd look further back in your logs to
figure out why attempt 0 failed.  The original error is certainly not
caused by SPARK-7308 -- maybe it will be a from a problem that is easier to
solve.

So I'd suggest trying to find that error, and if you are stuck figuring out
its cause, post here again with the logs from that error.

On Mon, May 18, 2015 at 12:19 PM, zia_kayani 
wrote:

> Hi, I'm getting this exception after shifting my code from Spark 1.2 to
> Spark
> 1.3
>
> 15/05/18 18:22:39 WARN TaskSetManager: Lost task 0.0 in stage 1.6 (TID 84,
> cloud8-server): FetchFailed(BlockManagerId(1, cloud4-server, 7337),
> shuffleId=0, mapId=9, reduceId=1, message=
> org.apache.spark.shuffle.FetchFailedException: java.lang.RuntimeException:
> Failed to open file:
>
> /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
> at
>
> org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:202)
> at
>
> org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getBlockData(ExternalShuffleBlockManager.java:112)
> at
>
> org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.receive(ExternalShuffleBlockHandler.java:74)
> at
>
> org.apache.spark.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:124)
> at
>
> org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:97)
> at
>
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:91)
> at
>
> org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:44)
> at
>
> io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:163)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:333)
> at
>
> io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:319)
> at
>
> io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:787)
> at
>
> io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:130)
> at
> io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
> at
>
> io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
> at
>
> io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:116)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException:
>
> /tmp/spark-fff63849-a318-4e48-bdea-2f563076ad5d/spark-40ba3a41-0f4d-446e-b806-e788e210d394/spark-a3d61f7a-22e9-4b3b-9346-ff3b70d0e43d/blockmgr-0e3b2b5d-f677-4e91-b98b-ed913adbd15f/39/shuffle_0_9_0.index
> (Permission denied)
> at java.io.FileInputStream.open(Native Method)
> at java.io.FileInputStream.(FileInputStream.java:146)
> at
>
> org.apache.spark.network.shuffle.ExternalShuffleBlockManager.getSortBasedShuffleBlockData(ExternalShuffleBlockManager.java:191)
> ... 23 more
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/org-apache-spark-shuffle-FetchFailedException-Migration-from-Spark-1-2-to-1-3-tp22937.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Hive in IntelliJ

2015-05-19 Thread Heisenberg Bb
I was trying to implement this example:
http://spark.apache.org/docs/1.3.1/sql-programming-guide.html#hive-tables

It worked well when I built spark in terminal using command specified:
http://spark.apache.org/docs/1.3.1/building-spark.html#building-with-hive-and-jdbc-support

But when I try to implement in IntelliJ, following the specifications
specified:
https://cwiki.apache.org/confluence/display/SPARK/Useful+Developer+Tools#UsefulDeveloperTools-IntelliJ

It throws the error:
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
:21: error: object hive is not a member of package
org.apache.spark.sql

Can any one help me get through this issue.

Regards
Akhil


Re: Broadcast variables can be rebroadcast?

2015-05-19 Thread Imran Rashid
hmm, I guess it depends on the way you look at it.  In a way, I'm saying
that spark does *not* have any built in "auto-re-broadcast" if you try to
mutate a broadcast variable.  Instead, you should create something new, and
just broadcast it separately.  Then just have all the code you have
operating on your RDDs look at the new broadcast variable.

But I guess there is another way to look at it -- you are creating new
broadcast variables each time, but they all point to the same underlying
mutable data structure.  So in a way, you are "rebroadcasting" the same
underlying data structure.

Let me expand my example from earlier a little bit more:


def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit = {
 ...
}

// this is a val, because the data structure itself is mutable
val myMutableDataStructue = ...
// this is a var, because you will create new broadcasts
var myBroadcast = sc.broadcast(myMutableDataStructure)
(0 to 20).foreach { iteration =>
  oneIteration(myRDD, myBroadcast)
  // update your mutable data structure in place
  myMutableDataStructure.update(...)
  // ... but that doesn't effect the broadcast variables living out on the
cluster, so we need to
  // create a new one

  // this line is not required -- the broadcast var will automatically get
unpersisted when a gc
  // cleans up the old broadcast on the driver, but I'm including this here
for completeness,
  // in case you want to more proactively clean up old blocks if you are
low on space
  myBroadcast.unpersist()

  // now we create a new broadcast which has the updated data in our
mutable data structure
  myBroadcast = sc.broadcast(myMutableDataStructure)
}


hope this clarifies things!

Imran

On Tue, May 19, 2015 at 3:06 AM, N B  wrote:

> Hi Imran,
>
> If I understood you correctly, you are suggesting to simply call broadcast
> again from the driver program. This is exactly what I am hoping will work
> as I have the Broadcast data wrapped up and I am indeed (re)broadcasting
> the wrapper over again when the underlying data changes. However,
> documentation seems to suggest that one cannot re-broadcast. Is my
> understanding accurate?
>
> Thanks
> NB
>
>
> On Mon, May 18, 2015 at 6:24 PM, Imran Rashid 
> wrote:
>
>> Rather than "updating" the broadcast variable, can't you simply create a
>> new one?  When the old one can be gc'ed in your program, it will also get
>> gc'ed from spark's cache (and all executors).
>>
>> I think this will make your code *slightly* more complicated, as you need
>> to add in another layer of indirection for which broadcast variable to use,
>> but not too bad.  Eg., from
>>
>> var myBroadcast = sc.broadcast( ...)
>> (0 to 20).foreach{ iteration =>
>>   //  ... some rdd operations that involve myBroadcast ...
>>   myBroadcast.update(...) // wrong! dont' update a broadcast variable
>> }
>>
>> instead do something like:
>>
>> def oneIteration(myRDD: RDD[...], myBroadcastVar: Broadcast[...]): Unit =
>> {
>>  ...
>> }
>>
>> var myBroadcast = sc.broadcast(...)
>> (0 to 20).foreach { iteration =>
>>   oneIteration(myRDD, myBroadcast)
>>   var myBroadcast = sc.broadcast(...) // create a NEW broadcast here,
>> with whatever you need to update it
>> }
>>
>> On Sat, May 16, 2015 at 2:01 AM, N B  wrote:
>>
>>> Thanks Ayan. Can we rebroadcast after updating in the driver?
>>>
>>> Thanks
>>> NB.
>>>
>>>
>>> On Fri, May 15, 2015 at 6:40 PM, ayan guha  wrote:
>>>
 Hi

 broadcast variables are shipped for the first time it is accessed in a
 transformation to the executors used by the transformation. It will NOT
 updated subsequently, even if the value has changed. However, a new value
 will be shipped to any new executor comes into play after the value has
 changed. This way, changing value of broadcast variable is not a good idea
 as it can create inconsistency within cluster. From documentatins:

  In addition, the object v should not be modified after it is
 broadcast in order to ensure that all nodes get the same value of the
 broadcast variable


 On Sat, May 16, 2015 at 10:39 AM, N B  wrote:

> Thanks Ilya. Does one have to call broadcast again once the underlying
> data is updated in order to get the changes visible on all nodes?
>
> Thanks
> NB
>
>
> On Fri, May 15, 2015 at 5:29 PM, Ilya Ganelin 
> wrote:
>
>> The broadcast variable is like a pointer. If the underlying data
>> changes then the changes will be visible throughout the cluster.
>> On Fri, May 15, 2015 at 5:18 PM NB  wrote:
>>
>>> Hello,
>>>
>>> Once a broadcast variable is created using sparkContext.broadcast(),
>>> can it
>>> ever be updated again? The use case is for something like the
>>> underlying
>>> lookup data changing over time.
>>>
>>> Thanks
>>> NB
>>>
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>> http://apache-spark-user-list.10015

Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Akhil Das
Cool. Thanks for the detailed response Cody.

Thanks
Best Regards

On Tue, May 19, 2015 at 6:43 PM, Cody Koeninger  wrote:

> If those questions aren't answered by
>
> https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md
>
> please let me know so I can update it.
>
> If you set auto.offset.reset to largest, it will start at the largest
> offset.  Any messages before that will be skipped, so if prior runs of the
> job didn't consume them, they're lost.
>
> KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from
> a locality hint if you have kafka running on the same node as spark), and
> it doesn't have any long-running receivers.  Executors get whatever
> partitions the normal scheduler decides they should get.  If an executor
> fails, a different executor reads the offset range for the failed
> partition; they're immutable, so no difference in result.
>
> Deciding where to save offsets (or not) is up to you.  You can checkpoint,
> or store them yourself.
>
> On Mon, May 18, 2015 at 12:00 PM, Akhil Das 
> wrote:
>
>> I have played a bit with the directStream kafka api. Good work cody.
>> These are my findings and also can you clarify a few things for me (see
>> below).
>>
>> -> When "auto.offset.reset"-> "smallest" and you have 60GB of messages in
>> Kafka, it takes forever as it reads the whole 60GB at once. "largest" will
>> only read the latest messages.
>> -> To avoid this, you can actually limit the rate with
>> spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
>> reads the same amount of data).
>> -> Number of partitions per batch = number of kafka partitions.
>>
>> -> In the case of driver failures, offset reset being set to "smallest"
>> will replay the whole messages and "largest" will only read those messages
>> which are pushed after the streaming job has started. What happens to those
>> messages which arrive in between?
>>
>> *Few things which are unclear:*
>>
>> -> If we have a kafka topic with 9 partitions, and spark cluster with 3
>> slaves, how does it decides which slave should read from which partition?
>> And what happens if a single slave fails while reading the data?
>>
>> -> By default it doesn't push the offsets of messages which are read
>> anywhere, then how does it replay the message in case of failures?
>>
>> Thanks
>> Best Regards
>>
>> On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger 
>> wrote:
>>
>>> You linked to a google mail tab, not a public archive, so I don't know
>>> exactly which conversation you're referring to.
>>>
>>> As far as I know, streaming only runs a single job at a time in the
>>> order they were defined, unless you turn on an experimental option for more
>>> parallelism (TD or someone more knowledgeable can chime in on this).  If
>>> you're talking about the possibility of the next job starting before the
>>> prior one has fully finished, because your processing is lagging behind...
>>> I'm not 100% sure this is possible because I've never observed it.
>>>
>>> The thing is, it's a moot point, because if you're saving offsets
>>> yourself transactionally, you already need to be verifying that offsets are
>>> correct (increasing without gaps) in order to handle restarts correctly.
>>>
>>> If you're super concerned about how batches get generated, the direct
>>> api gives you access to KafkaUtils.createRDD... just schedule your own rdds
>>> in the order you want.  Again, flexible.
>>>
>>>
>>>
>>>
>>> On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya <
>>> dibyendu.bhattach...@gmail.com> wrote:
>>>
 Thanks Cody for your email. I think my concern was not to get the
 ordering of message within a partition , which as you said is possible if
 one knows how Spark works. The issue is how Spark schedule jobs on every
 batch  which is not on the same order they generated. So if that is not
 guaranteed it does not matter if you manege order within your partition. So
 depends on par-partition ordering to commit offset may leads to offsets
 commit in wrong order.

 In this thread you have discussed this as well and some workaround  :


 https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15

 So again , one need to understand every details of a Consumer to take a
 decision if that solves their use case.

 Regards,
 Dibyendu

 On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger 
 wrote:

> As far as I can tell, Dibyendu's "cons" boil down to:
>
> 1. Spark checkpoints can't be recovered if you upgrade code
> 2. Some Spark transformations involve a shuffle, which can repartition
> data
>
> It's not accurate to imply that either one of those things are
> inherently "cons" of the direct stream api.
>
> Regarding checkpoints, nothing about the direct stream requires you to
> use checkpoints.  You can save offsets in a checkpoint, your own database,
> or

Re: Spark and Flink

2015-05-19 Thread Till Rohrmann
I guess it's a typo: "eu.stratosphere" should be replaced by
"org.apache.flink"

On Tue, May 19, 2015 at 1:13 PM, Alexander Alexandrov <
alexander.s.alexand...@gmail.com> wrote:

> We managed to do this with the following config:
>
> // properties
> 
> 2.2.0
> 
> 0.9-SNAPSHOT
> 
> 1.2.1
>
> // form the dependency management
> 
> 
> org.apache.hadoop
> hadoop-common
> ${hadoop.version}
> provided
> 
> 
> org.apache.hadoop
> hadoop-hdfs
> ${hadoop.version}
> provided
> 
>
> 
> 
> eu.stratosphere
> flink-scala
> ${flink.version}
> provided
> 
> 
> eu.stratosphere
> flink-java
> ${flink.version}
> provided
> 
> 
> eu.stratosphere
> flink-clients
> ${flink.version}
> provided
> 
>
> 
> 
> org.apache.spark
> spark-core_${scala.tools.version}
> ${spark.version}
> provided
> 
>
> 
> 
> org.eclipse.jetty
> jetty-util
> ${jetty.version}
> 
> 
> org.eclipse.jetty
> jetty-servlet
> ${jetty.version}
> 
>
> // actual dependencies
> 
> 
> org.apache.spark
> spark-core_${scala.tools.version}
> 
>
> 
> 
> eu.stratosphere
> flink-scala
> 
> 
> eu.stratosphere
> flink-java
> 
> 
> eu.stratosphere
> flink-clients
> 
> 
> 
> com.fasterxml.jackson.core
> jackson-core
> 2.2.1
> provided
> 
> 
> com.fasterxml.jackson.core
> jackson-databind
> 2.2.1
> provided
> 
> 
> com.fasterxml.jackson.core
> jackson-annotations
> 2.2.1
> provided
> 
>
>
> 2015-05-19 10:06 GMT+02:00 Pa Rö :
>
>> it's sound good, maybe you can send me pseudo structure, that is my fist
>> maven project.
>>
>> best regards,
>> paul
>>
>> 2015-05-18 14:05 GMT+02:00 Robert Metzger :
>>
>>> Hi,
>>> I would really recommend you to put your Flink and Spark dependencies
>>> into different maven modules.
>>> Having them both in the same project will be very hard, if not
>>> impossible.
>>> Both projects depend on similar projects with slightly different
>>> versions.
>>>
>>> I would suggest a maven module structure like this:
>>> yourproject-parent (a pom module)
>>> --> yourproject-common
>>> --> yourproject-flink
>>> --> yourproject-spark
>>>
>>>
>>>
>>> On Mon, May 18, 2015 at 10:00 AM, Pa Rö 
>>> wrote:
>>>
 hi,
 if i add your dependency i get over 100 errors, now i change the
 version number:
 
 
 com.fasterxml.jackson.module
 jackson-module-scala_2.10
 2.4.4
 
 
 com.google.guava
 guava
 
 
 

 now the pom is fine, but i get the same error by run spark:
 WARN component.AbstractLifeCycle: FAILED
 org.eclipse.jetty.servlet.DefaultServlet-608411067:
 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.ResourceCache.(Lorg/eclipse/jetty/http/MimeTypes;)V

 java.lang.NoSuchMethodError:
 org.eclipse.jetty.server.ResourceCache.(Lorg/eclipse/jetty/http/MimeTypes;)V
 at
 org.eclipse.jetty.servlet.NIOResourceCache.(NIOResourceCache.java:41)
 at
 org.eclipse.jetty.servlet.DefaultServlet.init(DefaultServlet.java:223)
 at javax.servlet.GenericServlet.init(GenericServlet.java:244)
 at
 org.eclipse.jetty.servlet.ServletHolder.initServlet(ServletHolder.java:442)
 at
 org.eclipse.jetty.servlet.ServletHolder.doStart(ServletHolder.java:270)
 at
 org.eclipse.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64)
 at
 org.eclipse.jetty.servlet.ServletHandler.initialize(ServletHandler.java:721)
 at
 org.eclipse.jetty.servlet.ServletContextHandler.startContext(ServletContextHandler.java:279)
 at
 org.eclipse.jetty.server.handler.ContextHandler.doStart(ContextHandler.java:717)
 at
 org.eclipse.jetty.servlet.ServletContextHandler.doStart(ServletContextHandler.java:155)
 at
>>

Re: Reading Real Time Data only from Kafka

2015-05-19 Thread Cody Koeninger
If those questions aren't answered by

https://github.com/koeninger/kafka-exactly-once/blob/master/blogpost.md

please let me know so I can update it.

If you set auto.offset.reset to largest, it will start at the largest
offset.  Any messages before that will be skipped, so if prior runs of the
job didn't consume them, they're lost.

KafkaRDD / DirectStream doesn't make any scheduling decisions (aside from a
locality hint if you have kafka running on the same node as spark), and it
doesn't have any long-running receivers.  Executors get whatever partitions
the normal scheduler decides they should get.  If an executor fails, a
different executor reads the offset range for the failed partition; they're
immutable, so no difference in result.

Deciding where to save offsets (or not) is up to you.  You can checkpoint,
or store them yourself.

On Mon, May 18, 2015 at 12:00 PM, Akhil Das 
wrote:

> I have played a bit with the directStream kafka api. Good work cody. These
> are my findings and also can you clarify a few things for me (see below).
>
> -> When "auto.offset.reset"-> "smallest" and you have 60GB of messages in
> Kafka, it takes forever as it reads the whole 60GB at once. "largest" will
> only read the latest messages.
> -> To avoid this, you can actually limit the rate with
> spark.streaming.kafka.maxRatePerPartition, which is pretty stable (Always
> reads the same amount of data).
> -> Number of partitions per batch = number of kafka partitions.
>
> -> In the case of driver failures, offset reset being set to "smallest"
> will replay the whole messages and "largest" will only read those messages
> which are pushed after the streaming job has started. What happens to those
> messages which arrive in between?
>
> *Few things which are unclear:*
>
> -> If we have a kafka topic with 9 partitions, and spark cluster with 3
> slaves, how does it decides which slave should read from which partition?
> And what happens if a single slave fails while reading the data?
>
> -> By default it doesn't push the offsets of messages which are read
> anywhere, then how does it replay the message in case of failures?
>
> Thanks
> Best Regards
>
> On Wed, May 13, 2015 at 8:32 PM, Cody Koeninger 
> wrote:
>
>> You linked to a google mail tab, not a public archive, so I don't know
>> exactly which conversation you're referring to.
>>
>> As far as I know, streaming only runs a single job at a time in the order
>> they were defined, unless you turn on an experimental option for more
>> parallelism (TD or someone more knowledgeable can chime in on this).  If
>> you're talking about the possibility of the next job starting before the
>> prior one has fully finished, because your processing is lagging behind...
>> I'm not 100% sure this is possible because I've never observed it.
>>
>> The thing is, it's a moot point, because if you're saving offsets
>> yourself transactionally, you already need to be verifying that offsets are
>> correct (increasing without gaps) in order to handle restarts correctly.
>>
>> If you're super concerned about how batches get generated, the direct api
>> gives you access to KafkaUtils.createRDD... just schedule your own rdds in
>> the order you want.  Again, flexible.
>>
>>
>>
>>
>> On Wed, May 13, 2015 at 9:36 AM, Dibyendu Bhattacharya <
>> dibyendu.bhattach...@gmail.com> wrote:
>>
>>> Thanks Cody for your email. I think my concern was not to get the
>>> ordering of message within a partition , which as you said is possible if
>>> one knows how Spark works. The issue is how Spark schedule jobs on every
>>> batch  which is not on the same order they generated. So if that is not
>>> guaranteed it does not matter if you manege order within your partition. So
>>> depends on par-partition ordering to commit offset may leads to offsets
>>> commit in wrong order.
>>>
>>> In this thread you have discussed this as well and some workaround  :
>>>
>>>
>>> https://mail.google.com/mail/u/1/?tab=wm#search/rdd+order+guarantees/14b9f1eaf0b8bd15
>>>
>>> So again , one need to understand every details of a Consumer to take a
>>> decision if that solves their use case.
>>>
>>> Regards,
>>> Dibyendu
>>>
>>> On Wed, May 13, 2015 at 7:35 PM, Cody Koeninger 
>>> wrote:
>>>
 As far as I can tell, Dibyendu's "cons" boil down to:

 1. Spark checkpoints can't be recovered if you upgrade code
 2. Some Spark transformations involve a shuffle, which can repartition
 data

 It's not accurate to imply that either one of those things are
 inherently "cons" of the direct stream api.

 Regarding checkpoints, nothing about the direct stream requires you to
 use checkpoints.  You can save offsets in a checkpoint, your own database,
 or not save offsets at all (as James wants).  One might even say that the
 direct stream api is . . . flexible . . . in that regard.

 Regarding partitions, the direct stream api gives you the same ordering
 guarantee as Kafka, namely that w

Re: group by and distinct performance issue

2015-05-19 Thread Todd Nist
You may want to look at this tooling for helping identify performance
issues and bottlenecks:

https://github.com/kayousterhout/trace-analysis

I believe this is slated to become part of the web ui in the 1.4 release,
in fact based on the status of the JIRA,
https://issues.apache.org/jira/browse/SPARK-6418, looks like it is complete.


On Tue, May 19, 2015 at 3:56 AM, Akhil Das 
wrote:

> Hi Peer,
>
> If you open the driver UI (running on port 4040) you can see the stages
> and the tasks happening inside it. Best way to identify the bottleneck for
> a stage is to see if there's any time spending on GC, and how many tasks
> are there per stage (it should be a number > total # cores to achieve max
> parallelism). Also you can see for each task how long does it take etc into
> consideration.
>
> Thanks
> Best Regards
>
> On Tue, May 19, 2015 at 12:58 PM, Peer, Oded  wrote:
>
>>  I am running Spark over Cassandra to process a single table.
>>
>> My task reads a single days’ worth of data from the table and performs 50
>> group by and distinct operations, counting distinct userIds by different
>> grouping keys.
>>
>> My code looks like this:
>>
>>
>>
>>JavaRdd rdd = sc.parallelize().mapPartitions().cache() // reads
>> the data from the table
>>
>>for each groupingKey {
>>
>>   JavaPairRdd groupByRdd = rdd.mapToPair();
>>
>>   JavaPairRDD countRdd =
>> groupByRdd.distinct().mapToPair().reduceByKey() // counts distinct values
>> per grouping key
>>
>>}
>>
>>
>>
>> The distinct() stage takes about 2 minutes for every groupByValue, and my
>> task takes well over an hour to complete.
>>
>> My cluster has 4 nodes and 30 GB of RAM per Spark process, the table size
>> is 4 GB.
>>
>>
>>
>> How can I identify the bottleneck more accurately? Is it caused by
>> shuffling data?
>>
>> How can I improve the performance?
>>
>>
>>
>> Thanks,
>>
>> Oded
>>
>
>


Re: Reading Binary files in Spark program

2015-05-19 Thread Akhil Das
Try something like:

JavaPairRDD output = sc.newAPIHadoopFile(inputDir,
  org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
IntWritable.class,
  Text.class, new Job().getConfiguration());

With the type of input format that you require.

Thanks
Best Regards

On Tue, May 19, 2015 at 3:57 PM, Tapan Sharma 
wrote:

> Hi Team,
>
> I am new to Spark and learning.
> I am trying to read image files into spark job. This is how I am doing:
> Step 1. Created sequence files with FileName as Key and Binary image as
> value. i.e.  Text and BytesWritable.
> I am able to read these sequence files into Map Reduce programs.
>
> Step 2.
> I understand that Text and BytesWritable are Non Serializable therefore, I
> read the sequence file in Spark as following:
>
> SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
> JavaSparkContext ctx = new JavaSparkContext(sparkConf);
> JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
> String.class, Byte.class) ;
> final List> tuple2s = seqFiles.collect();
>
>
>
> The moment I try to call collect() method to get the keys of sequence file,
> following exception has been thrown
>
> Can any one help me understanding why collect() method is failing? If I use
> toArray() on seqFiles object then also I am getting same call stack.
>
> Regards
> Tapan
>
>
>
> java.io.NotSerializableException: org.apache.hadoop.io.Text
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
> at
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
> at
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
> at
>
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
> at
> java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
> at
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
> at
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
> at
>
> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
> at
>
> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> 2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
> scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
> 0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
> retrying
> 2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
> 0.0, whose tasks have all completed, from pool
> 2015-05-19 15:15:03,739 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
> scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage
> 0
> 2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
> (Logging.scala:logInfo(59)) - Job 0 failed: collect at
> JavaSequenceFile.java:44, took 4.421397 s
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due
> to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
> result: org.apache.hadoop.io.Text
> at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
> at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
> at scala.Option.foreach(Option.scala:236)
> at
>
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
> at
>
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
>
> akka.dispa

Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Tested with HiveContext also. It also take similar amount of time.

To make the things clear, the following is select clause for a given column


*aggregateStats( "$columnName" , max( cast($columnName as double)),
|min(cast($columnName as double)), avg(cast($columnName as double)),
count(*) )*

aggregateStats is UDF generating case class to hold the values.








Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 5:57 PM, madhu phatak  wrote:

> Hi,
> Tested for calculating values for 300 columns. Analyser takes around 4
> minutes to generate the plan. Is this normal?
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Tue, May 19, 2015 at 4:35 PM, madhu phatak 
> wrote:
>
>> Hi,
>> I am using spark 1.3.1
>>
>>
>>
>>
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>> On Tue, May 19, 2015 at 4:34 PM, Wangfei (X)  wrote:
>>
>>>  And which version are you using
>>>
>>> 发自我的 iPhone
>>>
>>> 在 2015年5月19日,18:29,"ayan guha"  写道:
>>>
>>>   can you kindly share your code?
>>>
>>> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
>>> wrote:
>>>
 Hi,
 I  am trying run spark sql aggregation on a file with 26k columns. No
 of rows is very small. I am running into issue that spark is taking huge
 amount of time to parse the sql and create a logical plan. Even if i have
 just one row, it's taking more than 1 hour just to get pass the parsing.
 Any idea how to optimize in these kind of scenarios?


  Regards,
  Madhukara Phatak
 http://datamantra.io/

>>>
>>>
>>>
>>>  --
>>> Best Regards,
>>> Ayan Guha
>>>
>>>
>>
>


Re: Spark sql error while writing Parquet file- Trying to write more fields than contained in row

2015-05-19 Thread Todd Nist
I believe your looking for  df.na.fill in scala, in pySpark Module it is
fillna (http://spark.apache.org/docs/latest/api/python/pyspark.sql.html)

from the docs:

df4.fillna({'age': 50, 'name': 'unknown'}).show()age height name10  80
Alice5   null   Bob50  null   Tom50  null   unknown


On Mon, May 18, 2015 at 11:01 PM, Chandra Mohan, Ananda Vel Murugan <
ananda.muru...@honeywell.com> wrote:

>  Hi,
>
>
>
> Thanks for the response. But I could not see fillna function in DataFrame
> class.
>
>
>
>
>
>
>
> Is it available in some specific version of Spark sql. This is what I have
> in my pom.xml
>
>
>
> 
>
>   org.apache.spark
>
>   spark-sql_2.10
>
>   1.3.1
>
>
>
>
>
> Regards,
>
> Anand.C
>
>
>
> *From:* ayan guha [mailto:guha.a...@gmail.com]
> *Sent:* Monday, May 18, 2015 5:19 PM
> *To:* Chandra Mohan, Ananda Vel Murugan; user
> *Subject:* Re: Spark sql error while writing Parquet file- Trying to
> write more fields than contained in row
>
>
>
> Hi
>
>
>
> Give a try with dtaFrame.fillna function to fill up missing column
>
>
>
> Best
>
> Ayan
>
>
>
> On Mon, May 18, 2015 at 8:29 PM, Chandra Mohan, Ananda Vel Murugan <
> ananda.muru...@honeywell.com> wrote:
>
> Hi,
>
>
>
> I am using spark-sql to read a CSV file and write it as parquet file. I am
> building the schema using the following code.
>
>
>
> String schemaString = "a b c";
>
>List fields = *new* ArrayList();
>
>MetadataBuilder mb = *new* MetadataBuilder();
>
>mb.putBoolean("nullable", *true*);
>
>Metadata m = mb.build();
>
>*for* (String fieldName: schemaString.split(" ")) {
>
> fields.add(*new* StructField(fieldName,DataTypes.
> *DoubleType*,*true*, m));
>
>}
>
>StructType schema = DataTypes.*createStructType*(fields);
>
>
>
> Some of the rows in my input csv does not contain three columns. After
> building my JavaRDD, I create data frame as shown below using the
> RDD and schema.
>
>
>
> DataFrame darDataFrame = sqlContext.createDataFrame(rowRDD, schema);
>
>
>
> Finally I try to save it as Parquet file
>
>
>
> darDataFrame.saveAsParquetFile("/home/anand/output.parquet”)
>
>
>
> I get this error when saving it as Parquet file
>
>
>
> java.lang.IndexOutOfBoundsException: Trying to write more fields than
> contained in row (3 > 2)
>
>
>
> I understand the reason behind this error. Some of my rows in Row RDD does
> not contain three elements as some rows in my input csv does not contain
> three columns. But while building the schema, I am specifying every field
> as nullable. So I believe, it should not throw this error. Can anyone help
> me fix this error. Thank you.
>
>
>
> Regards,
>
> Anand.C
>
>
>
>
>
>
>
>
>
> --
>
> Best Regards,
> Ayan Guha
>


RE: Decision tree: categorical variables

2015-05-19 Thread Keerthi
Hi ,

can you pls share how you resolved the parsing issue. It would be of great
help...

Thanks.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Decision-tree-categorical-variables-tp12433p22943.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
Tested for calculating values for 300 columns. Analyser takes around 4
minutes to generate the plan. Is this normal?




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:35 PM, madhu phatak  wrote:

> Hi,
> I am using spark 1.3.1
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Tue, May 19, 2015 at 4:34 PM, Wangfei (X)  wrote:
>
>>  And which version are you using
>>
>> 发自我的 iPhone
>>
>> 在 2015年5月19日,18:29,"ayan guha"  写道:
>>
>>   can you kindly share your code?
>>
>> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
>> wrote:
>>
>>> Hi,
>>> I  am trying run spark sql aggregation on a file with 26k columns. No of
>>> rows is very small. I am running into issue that spark is taking huge
>>> amount of time to parse the sql and create a logical plan. Even if i have
>>> just one row, it's taking more than 1 hour just to get pass the parsing.
>>> Any idea how to optimize in these kind of scenarios?
>>>
>>>
>>>  Regards,
>>>  Madhukara Phatak
>>> http://datamantra.io/
>>>
>>
>>
>>
>>  --
>> Best Regards,
>> Ayan Guha
>>
>>
>


?????? How to use spark to access HBase with Security enabled

2015-05-19 Thread donhoff_h
The principal is sp...@bgdt.dev.hrb. It is the user that I used to run my spark 
programs. I am sure I have run the kinit command to make it take effect. And I 
also used the HBase Shell to verify that this user has the right to scan and 
put the tables in HBase.


Now I still have no idea how to solve this problem. Can anybody help me to 
figure it out? Many Thanks!


--  --
??: "yuzhihong";;
: 2015??5??19??(??) 7:55
??: "donhoff_h"<165612...@qq.com>; 
: "user"; 
: Re: How to use spark to access HBase with Security enabled



Which user did you run your program as ?


Have you granted proper permission on hbase side ?


You should also check master log to see if there was some clue. 


Cheers




On May 19, 2015, at 2:41 AM, donhoff_h <165612...@qq.com> wrote:


Hi, experts.

I ran the "HBaseTest" program which is an example from the Apache Spark source 
code to learn how to use spark to access HBase. But I met the following 
exception:
Exception in thread "main" 
org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
attempts=36, exceptions:
Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
callTimeout=6, callDuration=68648: row 'spark_t01,,00' on table 
'hbase:meta' at region=hbase:meta,,1.1588230740, 
hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0

I also checked the RegionServer Log of the host "bgdt01.dev.hrb" listed in the 
above exception. I found a few entries like the following one:
2015-05-19 16:59:11,143 DEBUG 
[RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
RpcServer.listener,port=16020: Caught exception while reading:Authentication is 
required 

The above entry did not point to my program clearly. But the time is very near. 
Since my hbase version is HBase1.0.0 and I set security enabled, I doubt the 
exception was caused by the Kerberos authentication.  But I am not sure.

Do anybody know if my guess is right? And if I am right, could anybody tell me 
how to set Kerberos Authentication in a spark program? I don't know how to do 
it. I already checked the API doc , but did not found any API useful. Many 
Thanks!

By the way, my spark version is 1.3.0. I also paste the code of "HBaseTest" in 
the following:
***Source Code**
object HBaseTest {
  def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HBaseTest")
val sc = new SparkContext(sparkConf)
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, args(0))

// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
if (!admin.isTableAvailable(args(0))) {
  val tableDesc = new HTableDescriptor(args(0))
  admin.createTable(tableDesc)
}


val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
  classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  classOf[org.apache.hadoop.hbase.client.Result])


hBaseRDD.count()


sc.stop()
  }
}

Re: How to use spark to access HBase with Security enabled

2015-05-19 Thread Ted Yu
Which user did you run your program as ?

Have you granted proper permission on hbase side ?

You should also check master log to see if there was some clue. 

Cheers



> On May 19, 2015, at 2:41 AM, donhoff_h <165612...@qq.com> wrote:
> 
> Hi, experts.
> 
> I ran the "HBaseTest" program which is an example from the Apache Spark 
> source code to learn how to use spark to access HBase. But I met the 
> following exception:
> Exception in thread "main" 
> org.apache.hadoop.hbase.client.RetriesExhaustedException: Failed after 
> attempts=36, exceptions:
> Tue May 19 16:59:11 CST 2015, null, java.net.SocketTimeoutException: 
> callTimeout=6, callDuration=68648: row 'spark_t01,,00' on 
> table 'hbase:meta' at region=hbase:meta,,1.1588230740, 
> hostname=bgdt01.dev.hrb,16020,1431412877700, seqNum=0
> 
> I also checked the RegionServer Log of the host "bgdt01.dev.hrb" listed in 
> the above exception. I found a few entries like the following one:
> 2015-05-19 16:59:11,143 DEBUG 
> [RpcServer.reader=2,bindAddress=bgdt01.dev.hrb,port=16020] ipc.RpcServer: 
> RpcServer.listener,port=16020: Caught exception while reading:Authentication 
> is required 
> 
> The above entry did not point to my program clearly. But the time is very 
> near. Since my hbase version is HBase1.0.0 and I set security enabled, I 
> doubt the exception was caused by the Kerberos authentication.  But I am not 
> sure.
> 
> Do anybody know if my guess is right? And if I am right, could anybody tell 
> me how to set Kerberos Authentication in a spark program? I don't know how to 
> do it. I already checked the API doc , but did not found any API useful. Many 
> Thanks!
> 
> By the way, my spark version is 1.3.0. I also paste the code of "HBaseTest" 
> in the following:
> ***Source Code**
> object HBaseTest {
>   def main(args: Array[String]) {
> val sparkConf = new SparkConf().setAppName("HBaseTest")
> val sc = new SparkContext(sparkConf)
> val conf = HBaseConfiguration.create()
> conf.set(TableInputFormat.INPUT_TABLE, args(0))
> 
> // Initialize hBase table if necessary
> val admin = new HBaseAdmin(conf)
> if (!admin.isTableAvailable(args(0))) {
>   val tableDesc = new HTableDescriptor(args(0))
>   admin.createTable(tableDesc)
> }
> 
> val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],
>   classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
>   classOf[org.apache.hadoop.hbase.client.Result])
> 
> hBaseRDD.count()
> 
> sc.stop()
>   }
> }
> 


Re: Spark SQL on large number of columns

2015-05-19 Thread Wangfei (X)
And which version are you using

发自我的 iPhone

在 2015年5月19日,18:29,"ayan guha" 
mailto:guha.a...@gmail.com>> 写道:

can you kindly share your code?

On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
mailto:phatak@gmail.com>> wrote:
Hi,
I  am trying run spark sql aggregation on a file with 26k columns. No of rows 
is very small. I am running into issue that spark is taking huge amount of time 
to parse the sql and create a logical plan. Even if i have just one row, it's 
taking more than 1 hour just to get pass the parsing. Any idea how to optimize 
in these kind of scenarios?


Regards,
Madhukara Phatak
http://datamantra.io/



--
Best Regards,
Ayan Guha


Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I am using spark 1.3.1




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:34 PM, Wangfei (X)  wrote:

>  And which version are you using
>
> 发自我的 iPhone
>
> 在 2015年5月19日,18:29,"ayan guha"  写道:
>
>   can you kindly share your code?
>
> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
> wrote:
>
>> Hi,
>> I  am trying run spark sql aggregation on a file with 26k columns. No of
>> rows is very small. I am running into issue that spark is taking huge
>> amount of time to parse the sql and create a logical plan. Even if i have
>> just one row, it's taking more than 1 hour just to get pass the parsing.
>> Any idea how to optimize in these kind of scenarios?
>>
>>
>>  Regards,
>>  Madhukara Phatak
>> http://datamantra.io/
>>
>
>
>
>  --
> Best Regards,
> Ayan Guha
>
>


RE: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Ewan Leith
Thanks Cheng, that's brilliant, you've saved me a headache.

Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:58
To: Ewan Leith; user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

That's right. Also, Spark SQL can automatically infer schema from JSON 
datasets. You don't need to specify an Avro schema:

   sqlContext.jsonFile("json/path").saveAsParquetFile("parquet/path")

or with the new reader/writer API introduced in 1.4-SNAPSHOT:

   sqlContext.read.json("json/path").write.parquet("parquet/path")

Cheng
On 5/19/15 6:07 PM, Ewan Leith wrote:
Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON or CSV 
inputs) in Spark we shouldn't worry about using Avro at all, just use the Spark 
SQL StructType when building new Dataframes? If so, that will be a lot simpler!

Thanks,
Ewan

From: Cheng Lian [mailto:lian.cs@gmail.com]
Sent: 19 May 2015 11:01
To: Ewan Leith; user@spark.apache.org
Subject: Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or 
createDataFrame Interfaces?

Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as the 
intermediate schema format. So when converting Avro files to Parquet files, we 
internally converts Avro schema to Spark SQL StructType first, and then convert 
StructType to Parquet schema.

Cheng
On 5/19/15 4:42 PM, Ewan Leith wrote:
Hi all,

I might be missing something, but does the new Spark 1.3 sqlContext save 
interface support using Avro as the schema structure when writing Parquet 
files, in a similar way to AvroParquetWriter (which I've got working)?

I've seen how you can load an avro file and save it as parquet from 
https://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
 but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan




Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext Save or createDataFrame Interfaces?

2015-05-19 Thread Cheng Lian
That's right. Also, Spark SQL can automatically infer schema from JSON 
datasets. You don't need to specify an Avro schema:


sqlContext.jsonFile("json/path").saveAsParquetFile("parquet/path")

or with the new reader/writer API introduced in 1.4-SNAPSHOT:

   sqlContext.read.json("json/path").write.parquet("parquet/path")

Cheng

On 5/19/15 6:07 PM, Ewan Leith wrote:


Thanks Cheng, that makes sense.

So for new dataframe creation (not conversion from Avro but from JSON 
or CSV inputs) in Spark we shouldn’t worry about using Avro at all, 
just use the Spark SQL StructType when building new Dataframes? If so, 
that will be a lot simpler!


Thanks,

Ewan

*From:*Cheng Lian [mailto:lian.cs@gmail.com]
*Sent:* 19 May 2015 11:01
*To:* Ewan Leith; user@spark.apache.org
*Subject:* Re: AvroParquetWriter equivalent in Spark 1.3 sqlContext 
Save or createDataFrame Interfaces?


Hi Ewan,

Different from AvroParquetWriter, in Spark SQL we uses StructType as 
the intermediate schema format. So when converting Avro files to 
Parquet files, we internally converts Avro schema to Spark SQL 
StructType first, and then convert StructType to Parquet schema.


Cheng

On 5/19/15 4:42 PM, Ewan Leith wrote:

Hi all,

I might be missing something, but does the new Spark 1.3
sqlContext save interface support using Avro as the schema
structure when writing Parquet files, in a similar way to
AvroParquetWriter (which I’ve got working)?

I've seen how you can load an avro file and save it as parquet

fromhttps://databricks.com/blog/2015/03/24/spark-sql-graduates-from-alpha-in-spark-1-3.html,
but not using the 2 together.

Thanks, and apologies if I've missed something obvious!

Ewan





Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
An additional information is,  table is backed by a csv file which is read
using spark-csv from databricks.




Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 4:05 PM, madhu phatak  wrote:

> Hi,
> I have fields from field_0 to fied_26000. The query is select on
>
> max( cast($columnName as double)),
>|min(cast($columnName as double)), avg(cast($columnName as double)), 
> count(*)
>
> for all those 26000 fields in one query.
>
>
>
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>
> On Tue, May 19, 2015 at 3:59 PM, ayan guha  wrote:
>
>> can you kindly share your code?
>>
>> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
>> wrote:
>>
>>> Hi,
>>> I  am trying run spark sql aggregation on a file with 26k columns. No of
>>> rows is very small. I am running into issue that spark is taking huge
>>> amount of time to parse the sql and create a logical plan. Even if i have
>>> just one row, it's taking more than 1 hour just to get pass the parsing.
>>> Any idea how to optimize in these kind of scenarios?
>>>
>>>
>>> Regards,
>>> Madhukara Phatak
>>> http://datamantra.io/
>>>
>>
>>
>>
>> --
>> Best Regards,
>> Ayan Guha
>>
>
>


Re: Spark SQL on large number of columns

2015-05-19 Thread madhu phatak
Hi,
I have fields from field_0 to fied_26000. The query is select on

max( cast($columnName as double)),
   |min(cast($columnName as double)), avg(cast($columnName as double)), count(*)

for all those 26000 fields in one query.





Regards,
Madhukara Phatak
http://datamantra.io/

On Tue, May 19, 2015 at 3:59 PM, ayan guha  wrote:

> can you kindly share your code?
>
> On Tue, May 19, 2015 at 8:04 PM, madhu phatak 
> wrote:
>
>> Hi,
>> I  am trying run spark sql aggregation on a file with 26k columns. No of
>> rows is very small. I am running into issue that spark is taking huge
>> amount of time to parse the sql and create a logical plan. Even if i have
>> just one row, it's taking more than 1 hour just to get pass the parsing.
>> Any idea how to optimize in these kind of scenarios?
>>
>>
>> Regards,
>> Madhukara Phatak
>> http://datamantra.io/
>>
>
>
>
> --
> Best Regards,
> Ayan Guha
>


Re: Spark SQL on large number of columns

2015-05-19 Thread ayan guha
can you kindly share your code?

On Tue, May 19, 2015 at 8:04 PM, madhu phatak  wrote:

> Hi,
> I  am trying run spark sql aggregation on a file with 26k columns. No of
> rows is very small. I am running into issue that spark is taking huge
> amount of time to parse the sql and create a logical plan. Even if i have
> just one row, it's taking more than 1 hour just to get pass the parsing.
> Any idea how to optimize in these kind of scenarios?
>
>
> Regards,
> Madhukara Phatak
> http://datamantra.io/
>



-- 
Best Regards,
Ayan Guha


Reading Binary files in Spark program

2015-05-19 Thread Tapan Sharma
Hi Team,

I am new to Spark and learning.
I am trying to read image files into spark job. This is how I am doing:
Step 1. Created sequence files with FileName as Key and Binary image as
value. i.e.  Text and BytesWritable.
I am able to read these sequence files into Map Reduce programs.

Step 2.
I understand that Text and BytesWritable are Non Serializable therefore, I
read the sequence file in Spark as following:

SparkConf sparkConf = new SparkConf().setAppName("JavaSequenceFile");
JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaPairRDD seqFiles = ctx.sequenceFile(args[0],
String.class, Byte.class) ;
final List> tuple2s = seqFiles.collect();

   

The moment I try to call collect() method to get the keys of sequence file,
following exception has been thrown

Can any one help me understanding why collect() method is failing? If I use
toArray() on seqFiles object then also I am getting same call stack.

Regards
Tapan



java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at 
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1377)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1173)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:206)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
2015-05-19 15:15:03,705 ERROR [task-result-getter-0]
scheduler.TaskSetManager (Logging.scala:logError(75)) - Task 0.0 in stage
0.0 (TID 0) had a not serializable result: org.apache.hadoop.io.Text; not
retrying
2015-05-19 15:15:03,731 INFO  [task-result-getter-0]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Removed TaskSet
0.0, whose tasks have all completed, from pool 
2015-05-19 15:15:03,739 INFO  [sparkDriver-akka.actor.default-dispatcher-2]
scheduler.TaskSchedulerImpl (Logging.scala:logInfo(59)) - Cancelling stage 0
2015-05-19 15:15:03,747 INFO  [main] scheduler.DAGScheduler
(Logging.scala:logInfo(59)) - Job 0 failed: collect at
JavaSequenceFile.java:44, took 4.421397 s
Exception in thread "main" org.apache.spark.SparkException: Job aborted due
to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable
result: org.apache.hadoop.io.Text
at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
at
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)






--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Reading-Binary-files-in-Spark-p

  1   2   >